5

我是一个请求,我是如何被发送的?

 3 years ago
source link: https://my.oschina.net/u/4526289/blog/5130361
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

我是一个请求,我是如何被发送的? - 华为云开发者社区的个人空间 - OSCHINA - 中文开源技术交流社区

摘要:本文主要分析使用cse提供的RestTemplate的场景,其实cse提供的rpc注解(RpcReference)的方式最后的调用逻辑和RestTemplate是殊途同归的。

本文分享自华为云社区《我是一个请求,我该何去何从(下)》,原文作者:向昊 。

上次我们大概了解到了服务端是怎么处理请求的,那么发送请求又是个什么样的流程了?本文主要分析使用cse提供的RestTemplate的场景,其实cse提供的rpc注解(RpcReference)的方式最后的调用逻辑和RestTemplate是殊途同归的。

使用cse提供的RestTemplate时候,是这样初始化的:

RestTemplate restTemplate = RestTemplateBuilder.create();

restTemplate.getForObject("cse://appId:serviceName/xxx", Object.class);

我们可以注意到2个怪异的地方:

  • RestTemplate是通过RestTemplateBuilder.create()来获取的,而不是用的Spring里提供的。
  • 请求路径开头是cse而不是我们常见的http、https且需要加上服务所属的应用ID和服务名称。

根据url匹配RestTemplate

首先看下RestTemplateBuilder.create(),它返回的是org.apache.servicecomb.provider.springmvc.reference.RestTemplateWrapper,是cse提供的一个包装类。

// org.apache.servicecomb.provider.springmvc.reference.RestTemplateWrapper
// 用于同时支持cse调用和非cse调用
class RestTemplateWrapper extends RestTemplate {
    private final List<AcceptableRestTemplate> acceptableRestTemplates = new ArrayList<>();
 
    final RestTemplate defaultRestTemplate = new RestTemplate();
 
    RestTemplateWrapper() {
        acceptableRestTemplates.add(new CseRestTemplate());
    }
 
    RestTemplate getRestTemplate(String url) {
        for (AcceptableRestTemplate template : acceptableRestTemplates) {
            if (template.isAcceptable(url)) {
                return template;
            }
        }
        return defaultRestTemplate;
    }
}

AcceptableRestTemplate:这个类是一个抽象类,也是继承RestTemplate的,目前其子类就是CseRestTemplate,我们也可以看到在初始化的时候会默认往acceptableRestTemplates中添加一个CseRestTemplate。

回到使用的地方restTemplate.getForObject:这个方法会委托给如下方法:

public <T> T getForObject(String url, Class<T> responseType, Object... urlVariables) throws RestClientException {
    return getRestTemplate(url).getForObject(url, responseType, urlVariables);
}

可以看到首先会调用getRestTemplate(url),即会调用template.isAcceptable(url),如果匹配到了就返回CseRestTemplate,否则就返回常规的RestTemplate。那么再看下isAcceptable()这个方法:

到这里我们就清楚了路径中的cse://的作用了,就是为了使用CseRestTemplate来发起请求,也理解了为啥RestTemplateWrapper可以同时支持cse调用和非cse调用。

从上面可知,我们的cse调用其实都是委托给CseRestTemplate了。在构造CseRestTemplate的时候会初始化几个东西:

public CseRestTemplate() {
    setMessageConverters(Arrays.asList(new CseHttpMessageConverter()));
    setRequestFactory(new CseClientHttpRequestFactory());
    setUriTemplateHandler(new CseUriTemplateHandler());
}

这里需要重点关注new CseClientHttpRequestFactory():

public class CseClientHttpRequestFactory implements ClientHttpRequestFactory {
    @Override
    public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
        return new CseClientHttpRequest(uri, httpMethod);
    }
}

最终委托到了CseClientHttpRequest这个类,这里就是重头戏了!

我们先把注意力拉回到这句话:restTemplate.getForObject("cse://appId:serviceName/xxx", Object.class),从上面我们知道其逻辑是先根据url找到对应的RestTemplate,然后调用getForObject这个方法,最终这个方法会调用到:org.springframework.web.client.RestTemplate#doExecute:

protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
    @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
 
    ClientHttpResponse response = null;
    try {
        ClientHttpRequest request = createRequest(url, method);
        if (requestCallback != null) {
            requestCallback.doWithRequest(request);
        }
        response = request.execute();
        handleResponse(url, method, response);
        return (responseExtractor != null ? responseExtractor.extractData(response) : null);
    }
}

createRequest(url, method):会调用getRequestFactory().createRequest(url, method),即最终会调用到我们初始化CseClientHttpRequest是塞的RequestFactory,所以这里会返回ClientHttpRequest这个类。

request.execute():这个方法会委托到org.apache.servicecomb.provider.springmvc.reference.CseClientHttpRequest#execute这个方法上。

至此我们知道前面的调用最终会委托到CseClientHttpRequest#execute这个方法上。

cse调用

接着上文分析:

public ClientHttpResponse execute() {
    path = findUriPath(uri);
    requestMeta = createRequestMeta(method.name(), uri);
 
    QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri.getRawSchemeSpecificPart());
    queryParams = queryStringDecoder.parameters();
 
    Object[] args = this.collectArguments();
 
    // 异常流程,直接抛异常出去
    return this.invoke(args);
}

createRequestMeta(method.name(), uri):这里主要是根据microserviceName去获取调用服务的信息,并会将获取的信息放入到Map中。服务信息如下:

v2-1237e110ecd59a6b5131a2978b9319a6_720w.jpg

可以看到里面的信息很丰富,例如应用名、服务名、还有接口对应的yaml信息等。

this.collectArguments():这里隐藏了一个校验点,就是会校验传入的参数是否符合对方接口的定义。主要是通过这个方法:org.apache.servicecomb.common.rest.codec.RestCodec#restToArgs,如果不符合真个流程就结束了。

准备invocation

从上面分析可知,获取到接口所需的参数后就会调用这个方法:org.apache.servicecomb.provider.springmvc.reference.CseClientHttpRequest#invoke:

private CseClientHttpResponse invoke(Object[] args) {
    Invocation invocation = prepareInvocation(args);
    Response response = doInvoke(invocation);
 
    if (response.isSuccessed()) {
        return new CseClientHttpResponse(response);
    }
 
    throw ExceptionFactory.convertConsumerException(response.getResult());
}

prepareInvocation(args):这个方法会准备好Invocation,这个Invocation在上集已经分析过了,不过上集中的它是为服务端服务的,那么咱们这块当然就得为消费端工作了

protected Invocation prepareInvocation(Object[] args) {
    Invocation invocation =
        InvocationFactory.forConsumer(requestMeta.getReferenceConfig(),
            requestMeta.getOperationMeta(),
            args);
 
    return invocation;
}

从名字也可以看出它是为消费端服务的,其实无论是forProvider还是forConsumer,它们最主要的区别就是加载的Handler不同,这次加载的Handler如下:

  • class org.apache.servicecomb.qps.ConsumerQpsFlowControlHandler(流控)
  • class org.apache.servicecomb.loadbalance.LoadbalanceHandler(负载)
  • class org.apache.servicecomb.bizkeeper.ConsumerBizkeeperHandler(容错)
  • class org.apache.servicecomb.core.handler.impl.TransportClientHandler(调用,默认加载的)

前面3个Handler 可以参考下这个微服务治理专栏

doInvoke(invocation):初始化好了invocation后就开始调用了。最终会调用到这个方法上:org.apache.servicecomb.core.provider.consumer.InvokerUtils#innerSyncInvoke

至此,这些动作就是cse中RestTemplate和rpc调用的不同之处。不过可以清楚的看到RestTemplate的方式是只支持同步的,即innerSyncInvoke,但是rpc是可以支持异步的,即reactiveInvoke

public static Response innerSyncInvoke(Invocation invocation) {
 
    invocation.next(respExecutor::setResponse);
}

到这里我们知道了,消费端发起请求还是得靠invocation的责任链驱动

启动invocation责任链

好了,咱们的老朋友又出现了:invocation.next,这个方法是个典型的责任链模式,其链条就是上面说的那4个Handler。前面3个就不分析了,直接跳到TransportClientHandler。

// org.apache.servicecomb.core.handler.impl.TransportClientHandler
public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
    Transport transport = invocation.getTransport();
    transport.send(invocation, asyncResp);
}

invocation.getTransport():获取请求地址,即最终发送请求的时候还是以ip:port的形式。

transport.send(invocation, asyncResp):调用链为

org.apache.servicecomb.transport.rest.vertx.VertxRestTransport#send

  • ->org.apache.servicecomb.transport.rest.client.RestTransportClient#send(这里会初始化HttpClientWithContext,下面会分析)
  • ->org.apache.servicecomb.transport.rest.client.http.RestClientInvocation#invoke(真正发送请求的地方)
public void invoke(Invocation invocation, AsyncResponse asyncResp) throws Exception {
 
    createRequest(ipPort, path);
    clientRequest.putHeader(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE, invocation.getMicroserviceName());
    RestClientRequestImpl restClientRequest =
        new RestClientRequestImpl(clientRequest, httpClientWithContext.context(), asyncResp, throwableHandler);
    invocation.getHandlerContext().put(RestConst.INVOCATION_HANDLER_REQUESTCLIENT, restClientRequest);
 
    Buffer requestBodyBuffer = restClientRequest.getBodyBuffer();
    HttpServletRequestEx requestEx = new VertxClientRequestToHttpServletRequest(clientRequest, requestBodyBuffer);
    invocation.getInvocationStageTrace().startClientFiltersRequest();
    // 触发filter.beforeSendRequest方法
    for (HttpClientFilter filter : httpClientFilters) {
        if (filter.enabled()) {
            filter.beforeSendRequest(invocation, requestEx);
        }
    }
 
    // 从业务线程转移到网络线程中去发送
    // httpClientWithContext.runOnContext
}

createRequest(ipPort, path):根据参数初始化HttpClientRequest clientRequest,初始化的时候会传入一个创建一个responseHandler,即对响应的处理。

注意org.apache.servicecomb.common.rest.filter.HttpClientFilter#afterReceiveResponse的调用就是在这里埋下伏笔的,是通过回调org.apache.servicecomb.transport.rest.client.http.RestClientInvocation#processResponseBody这个方法触发的(在创建responseHandler时候创建的)

且org.apache.servicecomb.common.rest.filter.HttpClientFilter#beforeSendRequest:这个方法的触发我们也可以很清楚的看到在发送请求执行的。

requestEx:注意它的类型是HttpServletRequestEx,虽然名字里面带有Servlet,但是打开它的方法可以发现有很多我们在tomcat中那些常用的方法都直接抛出异常了,这也是一个易错点!

httpClientWithContext.runOnContext:用来发送请求的逻辑,不过这里还是有点绕的,下面重点分析下

httpClientWithContext.runOnContext

首先看下HttpClientWithContext的定义:

public class HttpClientWithContext {
    public interface RunHandler {
        void run(HttpClient httpClient);
    }
 
    private HttpClient httpClient;
 
    private Context context;
 
    public HttpClientWithContext(HttpClient httpClient, Context context) {
        this.httpClient = httpClient;
        this.context = context;
    }
 
    public void runOnContext(RunHandler handler) {
        context.runOnContext((v) -> {
            handler.run(httpClient);
        });
    }
}

从上面可知发送请求调用的是这个方法:runOnContext,参数为RunHandler接口,然后是以lambda的方式传入的,lambda的参数为httpClient,这个httpClient又是在HttpClientWithContext的构造函数中初始化的。这个构造函数是在org.apache.servicecomb.transport.rest.client.RestTransportClient#send这个方法中初始化的(调用org.apache.servicecomb.transport.rest.client.RestTransportClient#findHttpClientPool这个方法)。

但是我们观察调用的地方:

// 从业务线程转移到网络线程中去发送
httpClientWithContext.runOnContext(httpClient -> {
    clientRequest.setTimeout(operationMeta.getConfig().getMsRequestTimeout());
    processServiceCombHeaders(invocation, operationMeta);
    try {
        restClientRequest.end();
    } catch (Throwable e) {
        LOGGER.error(invocation.getMarker(),
            "send http request failed, local:{}, remote: {}.", getLocalAddress(), ipPort, e);
        fail((ConnectionBase) clientRequest.connection(), e);
    }
});

其实在这块逻辑中HttpClient是没有被用到的,实际上发送请求的动作是restClientRequest.end()触发的,restClientRequest是cse中的类RestClientRequestImpl,然后它包装了HttpClientRequest(vertx中提供的),即restClientRequest.end()最终还是委托到了HttpClientRequest.end()上了。

那么这个HttpClientRequest是怎么被初始化的了?它是在createRequest(ipPort, path)这个方法中初始化的,即在调用org.apache.servicecomb.transport.rest.client.http.RestClientInvocation#invoke方法入口处。

初始化的逻辑如下:

clientRequest = httpClientWithContext.getHttpClient().request(method, 
requestOptions, this::handleResponse)

httpClientWithContext.getHttpClient():这个方法返回的是HttpClient,上面说的HttpClient作用就体现出来了,用来初始化了我们发送请求的关键先生:HttpClientRequest。那么至此我们发送请求的整体逻辑大概就清晰了。

无论是采用RestTemplate的方式还是采用rpc注解的方式来发送请求,其底层逻辑其实是一样的。即首先根据请求信息匹配到对方的服务信息,然后经过一些列的Handler处理,如限流、负载、容错等等(这也是一个很好的扩展机制),最终会走到TransportClientHandler这个Handler,然后根据条件去初始化发送的request,经过HttpClientFilter的处理后就会委托给vertx的HttpClientRequest来真正的发出请求。

点击关注,第一时间了解华为云新鲜技术~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK