2

深入剖析 Spring WebFlux

 2 years ago
source link: https://segmentfault.com/a/1190000040797855
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

一、WebFlux 简介

WebFlux 是 Spring Framework5.0 中引入的一种新的反应式Web框架。通过Reactor项目实现Reactive Streams规范,完全异步和非阻塞框架。本身不会加快程序执行速度,但在高并发情况下借助异步IO能够以少量而稳定的线程处理更高的吞吐,规避文件IO/网络IO阻塞带来的线程堆积。

1.1 WebFlux 的特性

WebFlux 具有以下特性:

  • 异步非阻塞 - 可以举一个上传例子。相对于 Spring MVC 是同步阻塞IO模型,Spring WebFlux这样处理:线程发现文件数据没传输好,就先做其他事情,当文件准备好时通知线程来处理(这里就是输入非阻塞方式),当接收完并写入磁盘(该步骤也可以采用异步非阻塞方式)完毕后再通知线程来处理响应(这里就是输出非阻塞方式)。
  • 响应式函数编程 - 相对于Java8 Stream 同步、阻塞的Pull模式,Spring Flux 采用Reactor Stream 异步、非阻塞Push模式。书写采用 Java lambda 方式,接近自然语言形式且容易理解。
  • 不拘束于Servlet - 可以运行在传统的Servlet 容器(3.1+版本),还能运行在Netty、Undertow等NIO容器中。

1.2 WebFlux 的设计目标

  • 适用高并发

二、Spring WebFlux 组件介绍

2.1 HTTPHandler

一个简单的处理请求和响应的抽象,用来适配不同HTTP服务容器的API。

2.2 WebHandler

一个用于处理业务请求抽象接口,定义了一系列处理行为。相关核心实现类如下;

2.3 DispatcherHandler

请求处理的总控制器,实际工作是由多个可配置的组件来处理。

WebFlux是兼容Spring MVC 基于@Controller,@RequestMapping等注解的编程开发方式的,可以做到平滑切换。

2.4 Functional Endpoints

这是一个轻量级函数编程模型。是基于@Controller,@RequestMapping等注解的编程模型的替代方案,提供一套函数式API 用于创建Router,Handler和Filter。调用处理组件如下:

简单的RouterFuntion 路由注册和业务处理过程:

@Bean
public RouterFunction<ServerResponse> initRouterFunction() {
    return RouterFunctions.route()
        .GET("/hello/{name}", serverRequest -> {
            String name = serverRequest.pathVariable("name");
            return ServerResponse.ok().bodyValue(name);
        }).build();
}

请求转发处理过程:

2.5 Reactive Stream

这是一个重要的组件,WebFlux 就是利用Reactor 来重写了传统Spring MVC 逻辑。其中Flux和Mono 是Reactor中两个关键概念。掌握了这两个概念才能理解WebFlux工作方式。

Flux和Mono 都实现了Reactor的Publisher接口,属于时间发布者,对消费者提供订阅接口,当有事件发生的时候,Flux或者Mono会通过回调消费者的相应的方法来通知消费者相应的事件。这就是所谓的响应式编程模型。

Mono工作流程图

只会在发送出单个结果后完成。

Flux工作流程图

发送出零个或者多个,可能无限个结果后才完成。

对于流式媒体类型:application/stream+json 或者 text/event-stream ,可以让调用端获得服务器滚动结果。
对于非流类型:application/json  WebFlux 默认JSON编码器会将序列化的JSON 一次性刷新到网络,这并不意味着阻塞,因为结果Flux<?> 是以反应式方式写入网络的,没有任何障碍。

三、WebFlux 工作原理

3.1 组件装配过程

流程相关源码解析-WebFluxAutoConfiguration

@Configuration
//条件装配 只有启动的类型是REACTIVE时加载
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
//只有存在 WebFluxConfigurer实例  时加载
@ConditionalOnClass(WebFluxConfigurer.class)
//在不存在  WebFluxConfigurationSupport实例时 加载
@ConditionalOnMissingBean({ WebFluxConfigurationSupport.class })
//在之后装配
@AutoConfigureAfter({ ReactiveWebServerFactoryAutoConfiguration.class,
      CodecsAutoConfiguration.class, ValidationAutoConfiguration.class })
//自动装配顺序
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class WebFluxAutoConfiguration {
   @Configuration
   @EnableConfigurationProperties({ ResourceProperties.class, WebFluxProperties.class })
   //接口编程 在装配WebFluxConfig 之前要先 装配EnableWebFluxConfiguration
   @Import({ EnableWebFluxConfiguration.class })
   public static class WebFluxConfig implements WebFluxConfigurer {
      //隐藏部分源码
     /**
     * Configuration equivalent to {@code @EnableWebFlux}.
     */
   } 
    @Configuration
    public static class EnableWebFluxConfiguration
            extends DelegatingWebFluxConfiguration {
        //隐藏部分代码
    }
    @Configuration
    @ConditionalOnEnabledResourceChain
    static class ResourceChainCustomizerConfiguration {
        //隐藏部分代码
    }
    private static class ResourceChainResourceHandlerRegistrationCustomizer
            implements ResourceHandlerRegistrationCustomizer {
        //隐藏部分代码
    }

WebFluxAutoConfiguration 自动装配时先自动装配EnableWebFluxConfiguration 而EnableWebFluxConfiguration->DelegatingWebFluxConfiguration ->WebFluxConfigurationSupport。

最终WebFluxConfigurationSupport 不仅配置DispatcherHandler 还同时配置了其他很多WebFlux核心组件包括 异常处理器WebExceptionHandler,映射处理器处理器HandlerMapping,请求适配器HandlerAdapter,响应处理器HandlerResultHandler 等。

DispatcherHandler 创建初始化过程如下;

public class WebFluxConfigurationSupport implements ApplicationContextAware {
   //隐藏部分代码
   @Nullable
   public final ApplicationContext getApplicationContext() {
      return this.applicationContext;
   }
    //隐藏部分代码
   @Bean
   public DispatcherHandler webHandler() {
      return new DispatcherHandler();
   }
public class DispatcherHandler implements WebHandler, ApplicationContextAware {
   @Nullable
   private List<HandlerMapping> handlerMappings;
   @Nullable
   private List<HandlerAdapter> handlerAdapters;
   @Nullable
   private List<HandlerResultHandler> resultHandlers;
       @Override
   public void setApplicationContext(ApplicationContext applicationContext) {
        initStrategies(applicationContext);
   }
   protected void initStrategies(ApplicationContext context) {
         //注入handlerMappings
      Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerMapping.class, true, false);

      ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
      AnnotationAwareOrderComparator.sort(mappings);
      this.handlerMappings = Collections.unmodifiableList(mappings);
       //注入handlerAdapters
      Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerAdapter.class, true, false);

      this.handlerAdapters = new ArrayList<>(adapterBeans.values());
      AnnotationAwareOrderComparator.sort(this.handlerAdapters);
      //注入resultHandlers
      Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerResultHandler.class, true, false);

      this.resultHandlers = new ArrayList<>(beans.values());
      AnnotationAwareOrderComparator.sort(this.resultHandlers);
   }

流程相关源码解析-HTTPHandlerAutoConfiguration

上面已讲解过WebFlux 核心组件装载过程,那么这些组件又是什么时候注入到对应的容器上下文中的呢?其实是在刷新容器上下文时注入进去的。

org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#onRefresh

public class ReactiveWebServerApplicationContext extends GenericReactiveWebApplicationContext
      implements ConfigurableWebServerApplicationContext {
   @Override
   protected void onRefresh() {
      super.onRefresh();
      try {
         createWebServer();
      }
      catch (Throwable ex) {
         throw new ApplicationContextException("Unable to start reactive web server", ex);
      }
   }
   private void createWebServer() {
      WebServerManager serverManager = this.serverManager;
      if (serverManager == null) {
         String webServerFactoryBeanName = getWebServerFactoryBeanName();
         ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);
         boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();
         // 这里创建容器管理时注入httpHandler
         this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);
         getBeanFactory().registerSingleton("webServerGracefulShutdown",
               new WebServerGracefulShutdownLifecycle(this.serverManager));
         // 注册一个 web容器启动服务类,该类继承了SmartLifecycle
         getBeanFactory().registerSingleton("webServerStartStop",
               new WebServerStartStopLifecycle(this.serverManager));
      }
      initPropertySources();
   }
   protected HttpHandler getHttpHandler() {
        String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class);
        if (beanNames.length == 0) {
            throw new ApplicationContextException(
                    "Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean.");
        }
        if (beanNames.length > 1) {
            throw new ApplicationContextException(
                    "Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : "
                            + StringUtils.arrayToCommaDelimitedString(beanNames));
        }
        //容器上下文获取httpHandler
        return getBeanFactory().getBean(beanNames[0], HttpHandler.class);
    }

而这个HTTPHandler 是由HTTPHandlerAutoConfiguration装配进去的。

@Configuration
@ConditionalOnClass({ DispatcherHandler.class, HttpHandler.class })
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
@ConditionalOnMissingBean(HttpHandler.class)
@AutoConfigureAfter({ WebFluxAutoConfiguration.class })
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class HttpHandlerAutoConfiguration {
   @Configuration
   public static class AnnotationConfig {
      private ApplicationContext applicationContext;
      public AnnotationConfig(ApplicationContext applicationContext) {
         this.applicationContext = applicationContext;
      }
      //构建WebHandler
      @Bean
      public HttpHandler httpHandler() {
         return WebHttpHandlerBuilder.applicationContext(this.applicationContext)
               .build();
      }
   }

流程相关源码解析-web容器

org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer 。在创建WebServerManager 容器管理器时会获取对应web容器实例,并注入响应的HTTPHandler。

class WebServerManager {
   private final ReactiveWebServerApplicationContext applicationContext;
   private final DelayedInitializationHttpHandler handler;
   private final WebServer webServer;
   WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory,
         Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {
      this.applicationContext = applicationContext;
      Assert.notNull(factory, "Factory must not be null");
      this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);
      this.webServer = factory.getWebServer(this.handler);
   }
}

以Tomcat 容器为例展示创建过程,使用的是 TomcatHTTPHandlerAdapter 来连接Servlet 请求到HTTPHandler组件。

public class TomcatReactiveWebServerFactory extends AbstractReactiveWebServerFactory implements ConfigurableTomcatWebServerFactory {
    //隐藏部分代码   
    @Override
    public WebServer getWebServer(HttpHandler httpHandler) {
        if (this.disableMBeanRegistry) {
            Registry.disableRegistry();
        }
        Tomcat tomcat = new Tomcat();
        File baseDir = (this.baseDirectory != null) ? this.baseDirectory : createTempDir("tomcat");
        tomcat.setBaseDir(baseDir.getAbsolutePath());
        Connector connector = new Connector(this.protocol);
        connector.setThrowOnFailure(true);
        tomcat.getService().addConnector(connector);
        customizeConnector(connector);
        tomcat.setConnector(connector);
        tomcat.getHost().setAutoDeploy(false);
        configureEngine(tomcat.getEngine());
        for (Connector additionalConnector : this.additionalTomcatConnectors) {
            tomcat.getService().addConnector(additionalConnector);
        }
        TomcatHttpHandlerAdapter servlet = new TomcatHttpHandlerAdapter(httpHandler);
        prepareContext(tomcat.getHost(), servlet);
        return getTomcatWebServer(tomcat);
    }
}

最后Spring容器加载后通过SmartLifecycle实现类WebServerStartStopLifecycle 来启动Web容器。

WebServerStartStopLifecycle 注册过程详见:org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer

3.2 完整请求处理流程

(引用自:https://blog.csdn.net)

该图给出了一个HTTP请求处理的调用链路。是采用Reactor Stream 方式书写,只有最终调用 subscirbe 才真正执行业务逻辑。基于WebFlux 开发时要避免controller 中存在阻塞逻辑。列举下面例子可以看到Spring MVC 和Spring Webflux 之间的请求处理区别。

@RestControllerpublic
class TestController {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @GetMapping("sync")
    public String sync() {
        logger.info("sync method start");
        String result = this.execute();
        logger.info("sync method end");
        return result;
    }
    @GetMapping("async/mono")
    public Mono<String> asyncMono() {
        logger.info("async method start");
        Mono<String> result = Mono.fromSupplier(this::execute);
        logger.info("async method end");
        return result;
    }
    private String execute() {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello";
    }
}
2021-05-31 20:14:52.384  INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController  : sync method start
2021-05-31 20:14:57.385  INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController  : sync method end
2021-05-31 20:15:09.659  INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController  : async method start
2021-05-31 20:15:09.660  INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController  : async method end

从上面例子可以看出sync() 方法阻塞了请求,而asyncMono() 没有阻塞请求并立刻返回的。asyncMono() 方法具体业务逻辑 被包裹在了Mono 中Supplier中的了。当execute 处理完业务逻辑后通过回调方式响应给浏览器。

四、存储支持

一旦控制层使用了 Spring Webflux 则安全认证层、数据访问层都必须使用 Reactive API 才真正实现异步非阻塞。

NOSQL Database

  • MongoDB (org.springframework.boot:spring-boot-starter-data-mongodb-reactive)。
  • Redis(org.springframework.boot:spring-boot-starter-data-redis-reactive)。

Relational Database

  • H2 (io.r2dbc:r2dbc-h2)
  • MariaDB (org.mariadb:r2dbc-mariadb)
  • Microsoft SQL Server (io.r2dbc:r2dbc-mssql)
  • MySQL (dev.miku:r2dbc-mysql)
  • jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)
  • Postgres (io.r2dbc:r2dbc-postgresql)
  • Oracle (com.oracle.database.r2dbc:oracle-r2dbc)

关于Spring MVC 和Spring WebFlux 测评很多,本文引用下做简单说明。参考:《Spring: Blocking vs non-blocking: R2DBC vs JDBC and WebFlux vs Web MVC》。

基本依赖

<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-data-r2dbc</artifactId> 
</dependency> 
<!-- r2dbc 连接池 --> 
<dependency> 
    <groupId >io.r2dbc</groupId> 
    <artifactId>r2dbc-pool</artifactId> 
</dependency> 
<!--r2dbc mysql 库--> 
<dependency> 
    <groupId>dev.miku</groupId> 
    <artifactId>r2dbc- mysql</artifactId> 
</dependency> 
<!--自动配置需要引入一个嵌入式数据库类型对象--> 
<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-data-jdbc</artifactId> 
</dependency>
<!-- 反应方程式 web 框架 webflux--> 
<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-webflux</artifactId> 
</dependency>

相同数据下效果如下

Spring MVC + JDBC 在低并发下表现最好,但 WebFlux + R2DBC 在高并发下每个处理请求使用的内存最少。

Spring WebFlux + R2DBC 在高并发下,吞吐量表现优异。

作者:vivo互联网服务器团队-Zhou Changqing


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK