5

Spring Cloud Gateway网关全局核心过滤器路由执行过程详解

 1 year ago
source link: https://www.51cto.com/article/752057.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

2 ReactiveLoadBalancerClientFilter

如果URL有一个lb(例如lb://order-service),它使用Spring Cloud ReactorLoadBalancer将名称(在本例中为order-service)解析为一个实际的主机和端口,并替换相同属性中的URI。​

public class ReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {
  private final LoadBalancerClientFactory clientFactory;
  private final GatewayLoadBalancerProperties properties;
  private final LoadBalancerProperties loadBalancerProperties;


  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    // 从上下文中获取,如:lb://order-service/orders
    URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
    String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
    if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
      return chain.filter(exchange);
    }
    // preserve the original url
    addOriginalRequestUrl(exchange, url);
    // 再次获取
    URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
    // 获取服务名;order-service
    String serviceId = requestUri.getHost();
    // clientFactory.getInstances方法会从NamedContextFactory.contexts集合中查找以order-service为key对应的
    // AnnotationConfigApplicationContext,然后从这个容器中查找LoadBalancerLifecycle,默认返回{}
    // ------------------------------------------------------------
    /**
     * 每个服务对应的ApplicationContext包含如下13个Bean
     * org.springframework.context.annotation.internalConfigurationAnnotationProcessor
     * org.springframework.context.annotation.internalAutowiredAnnotationProcessor
     * org.springframework.context.annotation.internalCommonAnnotationProcessor
     * org.springframework.context.event.internalEventListenerProcessor
     * org.springframework.context.event.internalEventListenerFactory
     * propertyPlaceholderAutoConfiguration loadBalancerClientConfiguration
     * propertySourcesPlaceholderConfigurer
     * LoadBalancerClientConfiguration$ReactiveSupportConfiguration
     * discoveryClientServiceInstanceListSupplier
     * LoadBalancerClientConfiguration$BlockingSupportConfiguration,
     * reactorServiceInstanceLoadBalancer
     */
    // 这里集合返回{}
    Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
        .getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
            RequestDataContext.class, ResponseData.class, ServiceInstance.class);
    DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext(
        new RequestData(exchange.getRequest()), getHint(serviceId, loadBalancerProperties.getHint())));
    // choose负载查找指定服务(order-server)
    return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {
      if (!response.hasServer()) {
        supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
            .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response)));
        throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
      }
      ServiceInstance retrievedInstance = response.getServer();
      URI uri = exchange.getRequest().getURI();
      // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
      // if the loadbalancer doesn't provide one.
      String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
      if (schemePrefix != null) {
        overrideScheme = url.getScheme();
      }
      DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);
      URI requestUrl = reconstructURI(serviceInstance, uri);
      exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
      exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
      supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));
    }).then(chain.filter(exchange))
        .doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
            new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(CompletionContext.Status.FAILED,
                throwable, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))
        .doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(
            lifecycle -> lifecycle.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                CompletionContext.Status.SUCCESS, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),
                new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));
  }


  protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
    return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
  }


  private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId,
      Set<LoadBalancerLifecycle> supportedLifecycleProcessors) {
    // 从order-service对应的ApplicationContext中查找ReactorServiceInstanceLoadBalancer
    ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(serviceId,
        ReactorServiceInstanceLoadBalancer.class);
    if (loadBalancer == null) {
      throw new NotFoundException("No loadbalancer available for " + serviceId);
    }
    supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
    // 查找服务实例
    return loadBalancer.choose(lbRequest);
  }


  private String getHint(String serviceId, Map<String, String> hints) {
    String defaultHint = hints.getOrDefault("default", "default");
    String hintPropertyValue = hints.get(serviceId);
    return hintPropertyValue != null ? hintPropertyValue : defaultHint;
  }
}


// 轮询算分
public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
  final AtomicInteger position;
  ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;


  public Mono<Response<ServiceInstance>> choose(Request request) {
    // 接下面ClientFactoryObjectProvider中获取ServiceInstanceListSupplier
    ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
        .getIfAvailable(NoopServiceInstanceListSupplier::new);
    return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
  }


  private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
      List<ServiceInstance> serviceInstances) {
    Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
    if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
      ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
    }
    return serviceInstanceResponse;
  }


  private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
    if (instances.isEmpty()) {
      return new EmptyResponse();
    }
    // TODO: enforce order?
    int pos = Math.abs(this.position.incrementAndGet());
    ServiceInstance instance = instances.get(pos % instances.size());
    return new DefaultResponse(instance);
  }
}


class ClientFactoryObjectProvider<T> implements ObjectProvider<T> {
  private final NamedContextFactory<?> clientFactory;
  // type = ServiceInstanceListSupplier
  private final Class<T> type;
  // name = order-service
  private final String name;


  private ObjectProvider<T> delegate() {
    if (this.provider == null) {
      // 从order-service对应ApplicationContext中获取ServiceInstanceListSupplier
      // 这里最终返回的是:DiscoveryClientServiceInstanceListSupplier
      this.provider = this.clientFactory.getProvider(this.name, this.type);
    }
    return this.provider;
  }
}


public class LoadBalancerClientConfiguration {
  @Configuration(proxyBeanMethods = false)
  @ConditionalOnReactiveDiscoveryEnabled
  @Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)
  public static class ReactiveSupportConfiguration {


    @Bean
    @ConditionalOnBean(ReactiveDiscoveryClient.class)
    @ConditionalOnMissingBean
    @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default", matchIfMissing = true)
    public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
        ConfigurableApplicationContext context) {
      // 这里最终构建的是:DiscoveryClientServiceInstanceListSupplier
      return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);
    }
  }
}


public final class ServiceInstanceListSupplierBuilder {
  public ServiceInstanceListSupplierBuilder withDiscoveryClient() {
    this.baseCreator = context -> {
      // 先从order-service对应的ApplicationContext中查找ReactiveDiscoveryClient,如果你没有自定义,那么就会从
      // 父容器中查找,如果你使用的nacos,那么会返回NacosReactiveDiscoveryClient
      ReactiveDiscoveryClient discoveryClient = context.getBean(ReactiveDiscoveryClient.class);
      return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment());
    };
    return this;
  }
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK