7

dubbo(九):timeout超时机制解析 - 等你归去来

 2 years ago
source link: https://www.cnblogs.com/yougewe/p/16460724.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client



  在网络请求时,总会有各种异常情况出现,我们需要提前处理这种情况。在完善的rpc组件dubbo中,自然是不会少了这一层东西的。我们只需要通过一些简单的配置就可以达到超时限制的作用了。

  dubbo的设计理念是,客户端控制优先,服务端控制兜底。

1. 超时机制的实现思路

  要想实现超时,一般有两个思路。一个是客户端自行设置一个超时限制,达到超时时间还未返回,则抛出异常,不再等待结果。二是通过在超时后,将连接断开,从而使数据请求中断,最终也是以抛出异常的方式返回的。

  当然,超时有两种情况,一种是自己主动的超时,另一种是被别人关掉连接发生的超时(需主动主发送超时消息)。一般我们认为主动设置的超时是可控的,被动的超时将是一个不可逾越的鸿沟,如果必须需要更长的时间才能拿到结果时,此种超时将限制我们,我们只能另谋出路了,比如调用的异步化。

  一般地,要想实现超时,我们也有两种方式:一种是调用别人提供的api,其中包含了超时设置,此时仅需简单设置即可;另一种是我们自行实现的超时,比如原本只有一个无限接口,我们要实现超时,必须将其异步化,通过额外的线程来进行超时的检测和控制。

  那么,dubbo又是怎样实现超时的呢?

2. 客户端实现超时

  我们前面说过,dubbo中consumer端可以设置超时,服务端也可以提供超时设置。那么,会不会是客户端和服务端都要实现超时机制呢?不管怎么样,客户端是一定要做的。所以,我们先来看看客户端实现超时的机制。

2.1. 客户端使用超时的方式

  首先,dubbo的调置超时方式,按照其整体架构设计理念,都有几个作用域:应用级 -> 接口级 -> 方法级。 consumer端 -> provider端。

// 消费者端特定方法的配置
<dubbo:consumer interface="com.alibaba.xxx.XxxService" >
    <dubbo:method name="findPerson" timeout="1000" />
</dubbo:consumer>
// 消费者端特定接口的配置
<dubbo:consumer interface="com.alibaba.xxx.XxxService" timeout="200" />
// 提供者端特定方法的配置
<dubbo:service interface="com.alibaba.xxx.XxxService" >
    <dubbo:method name="findPerson" timeout="1000" />
</dubbo:service>
// 提供者端特定接口的配置
<dubbo:service interface="com.alibaba.xxx.XxxService" timeout="200" />

  当然了,上面这种是使用xml进行配置的,你还可以使用properties文件进行配置,也可以使用java代码直接进行配置。

2.2. 超时参数的读取与使用

  这些参数设置好后,在调用rpc时,进行想入相应的Invoket,进行读取参数,使用。

    // org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker#invoke
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        // 同步和异步,底层都是异步请求,仅做上层封装
        Result asyncResult = invoker.invoke(invocation);

        try {
            // 同步请求时,在内部等待
            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                /**
                 * NOTICE!
                 * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
                 * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
                 */
                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
        } catch (InterruptedException e) {
            throw new RpcException("Interrupted unexpectedly while waiting for remote result to return!  method: " +
                    invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (ExecutionException e) {
            Throwable t = e.getCause();
            // 超时返回,给出详细堆栈
            if (t instanceof TimeoutException) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } else if (t instanceof RemotingException) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } else {
                throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        } catch (Throwable e) {
            throw new RpcException(e.getMessage(), e);
        }
        return asyncResult;
    }

    // org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke
    @Override
    public Result invoke(Invocation inv) throws RpcException {
        // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
        if (destroyed.get()) {
            logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
        }
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        if (CollectionUtils.isNotEmptyMap(attachment)) {
            invocation.addObjectAttachmentsIfAbsent(attachment);
        }

        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
            /**
             * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
             * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
             * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
             * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
             */
            invocation.addObjectAttachments(contextAttachments);
        }

        invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION));
        if (serializationId != null) {
            invocation.put(SERIALIZATION_ID_KEY, serializationId);
        }

        AsyncRpcResult asyncResult;
        try {
            // 调用远程方法
            asyncResult = (AsyncRpcResult) doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            Throwable te = e.getTargetException();
            if (te == null) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                }
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
            }
        } catch (RpcException e) {
            if (e.isBiz()) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                throw e;
            }
        } catch (Throwable e) {
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        }
        RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
        return asyncResult;
    }

    // org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            // 获取超时设置
            int timeout = calculateTimeout(invocation, methodName);
            invocation.put(TIMEOUT_KEY, timeout);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                // 响应结果回调,使用线程池接收
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                // 向服务端发送请求,并返回 future 作为结果接收器
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    // 超时配置读取,多种方式,多种优先级
    private int calculateTimeout(Invocation invocation, String methodName) {
        // timeout-countdown, 需要传导到服务端的超时控制
        Object countdown = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);
        // 默认1s超时
        int timeout = DEFAULT_TIMEOUT;
        if (countdown == null) {
            timeout = (int) RpcUtils.getTimeout(getUrl(), methodName, RpcContext.getContext(), DEFAULT_TIMEOUT);
            if (getUrl().getParameter(ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
                invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout); // pass timeout to remote server
            }
        } else {
            TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
            timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
            invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);// pass timeout to remote server
        }
        return timeout;
    }
    // org.apache.dubbo.rpc.support.RpcUtils#getTimeout
    public static long getTimeout(URL url, String methodName, RpcContext context, long defaultTimeout) {
        long timeout = defaultTimeout;
        // 先方法,后接口
        // 事实上,所有接口级的变量在注册的时候已经作用到了方法级上了,所以只需读取方法上的参数即可
        Object genericTimeout = context.getObjectAttachment(TIMEOUT_KEY);
        if (genericTimeout != null) {
            timeout = convertToNumber(genericTimeout, defaultTimeout);
        } else if (url != null) {
            timeout = url.getMethodPositiveParameter(methodName, TIMEOUT_KEY, defaultTimeout);
        }
        return timeout;
    }
    // org.apache.dubbo.common.URL#getMethodPositiveParameter(java.lang.String, java.lang.String, long)
    public long getMethodPositiveParameter(String method, String key, long defaultValue) {
        if (defaultValue <= 0) {
            throw new IllegalArgumentException("defaultValue <= 0");
        }
        long value = getMethodParameter(method, key, defaultValue);
        return value <= 0 ? defaultValue : value;
    }

    public long getMethodPositiveParameter(String method, String key, long defaultValue) {
        if (defaultValue <= 0) {
            throw new IllegalArgumentException("defaultValue <= 0");
        }
        long value = getMethodParameter(method, key, defaultValue);
        return value <= 0 ? defaultValue : value;
    }
    // org.apache.dubbo.common.URL#getMethodParameter(java.lang.String, java.lang.String, long)
    public long getMethodParameter(String method, String key, long defaultValue) {
        Number n = getCachedNumber(method, key);
        if (n != null) {
            return n.longValue();
        }
        String value = getMethodParameter(method, key);
        if (StringUtils.isEmpty(value)) {
            return defaultValue;
        }
        long l = Long.parseLong(value);
        updateCachedNumber(method, key, l);
        return l;
    }
    
    // org.apache.dubbo.rpc.protocol.AbstractInvoker#getCallbackExecutor
    protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
        ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
        if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
            // 同步请求使用少量的共享线程池,实际上是做进一步封装处理
            return new ThreadlessExecutor(sharedExecutor);
        } else {
            // 异步调用则直接使用共享线程池,不受其他节点控制
            return sharedExecutor;
        }
    }

  从上面可以看出,dubbo的超时机制是通过异步线程future的方式实现的,其中,同步调用的超时设置,底层也是用异步实现。这样既简化了底层实现,也对外提供了很好的易用性。因为底层都是通过netty或nio实现网络通信,而这种实现一般又是select-poll 模型或者 epoll 模型,反正也必须要用异步处理,所以不管如何也是跑不掉这个实现。只要实现好一个底层异步通知,全部基石就都好了。而上层,则只需关注是用户实现,还是框架实现了。

2.3. 客户端超时监控处理

  上面的实现,我们并没有看到具体是如何实现超时的,毕竟我们只是看到了表面现象,即只是设置了一个 timeout参数,而已。更深层次的实现,请继续。也就是说dubbo是在做请求的同时,做了超时的设置工作。

    // org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request
    @Override
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null,
                    "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        // 里面包含了一个超时任务 timeTask
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
    // org.apache.dubbo.remoting.exchange.support.DefaultFuture#newFuture
    /**
     * init a DefaultFuture
     * 1.init a DefaultFuture
     * 2.timeout check
     *
     * @param channel channel
     * @param request the request
     * @param timeout timeout
     * @return a new DefaultFuture
     */
    public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        future.setExecutor(executor);
        // ThreadlessExecutor needs to hold the waiting future in case of circuit return.
        if (executor instanceof ThreadlessExecutor) {
            ((ThreadlessExecutor) executor).setWaitingFuture(future);
        }
        // timeout check
        timeoutCheck(future);
        return future;
    }
    // org.apache.dubbo.remoting.exchange.support.DefaultFuture#timeoutCheck
    /**
     * check time out of the future
     */
    private static void timeoutCheck(DefaultFuture future) {
        TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
        // 添加一个定时器
        future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
    }
    
    // org.apache.dubbo.common.timer.HashedWheelTimer#newTimeout
    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                    + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                    + "timeouts (" + maxPendingTimeouts + ")");
        }

        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

    
        // org.apache.dubbo.remoting.exchange.support.DefaultFuture.TimeoutCheckTask#TimeoutCheckTask
        TimeoutCheckTask(Long requestID) {
            this.requestID = requestID;
        }
        @Override
        public void run(Timeout timeout) {
            DefaultFuture future = DefaultFuture.getFuture(requestID);
            if (future == null || future.isDone()) {
                return;
            }

            if (future.getExecutor() != null) {
                future.getExecutor().execute(() -> notifyTimeout(future));
            } else {
                notifyTimeout(future);
            }
        }
        // org.apache.dubbo.remoting.exchange.support.DefaultFuture.TimeoutCheckTask#notifyTimeout
        private void notifyTimeout(DefaultFuture future) {
            // create exception response.
            Response timeoutResponse = new Response(future.getId());
            // set timeout status.
            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
            // handle response.
            DefaultFuture.received(future.getChannel(), timeoutResponse, true);
        }
    // org.apache.dubbo.remoting.exchange.support.DefaultFuture#received
    public static void received(Channel channel, Response response, boolean timeout) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                Timeout t = future.timeoutCheckTask;
                if (!timeout) {
                    // decrease Time
                    t.cancel();
                }
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response status is " + response.getStatus()
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
    // 抛出异常消息
    private void doReceived(Response res) {
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) {
            this.complete(res.getResult());
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            // 封装为 TimeoutException
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
        }

        // the result is returning, but the caller thread may still waiting
        // to avoid endless waiting for whatever reason, notify caller thread to return.
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
            if (threadlessExecutor.isWaiting()) {
                threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
                        " which is not an expected state, interrupt the thread manually by returning an exception."));
            }
        }
    }

  总体来说就是,在提交服务端的查询请求时,会开启定时任务,检查超时。如果定时任务到期,还未收到结果则会触发超时通知。如果客户端还未成功发送数据,则认为是客户端自己超时。如果已经将数据发送出去,则认为暗服务端超时。这相当于是一个看门狗的形式处理了,就是说,不管服务端和客户端本身如何,总能被这东西给发现,所以这种超时控制是精确的。

  当然,除了看门狗的监控,还有的情况是需要应用自己去主动发现的。至少,它不能一直让看门狗起作用吧。

2.4. 异步处理结果的超时处理

  异步结果的处理有两种入口方式:一是后台线程处理好之后,自行将结果放置到合适的地方;二是主线程主动查询结果,如果没有完成就等待,直到完成或超时返回;dubbo是在发送请求时,设置一个定时器,检查是否超时,到超时时间就发送一个超时事件。并结束任务。

  同步和异步的结果处理方式如下:

    // org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        // 根据requestId, 取出之前设定的executor, 提交给业务线程池调用
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            // 将消息封装成 ChannelEventRunnable, 交由后续处理
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
    
    /**
     * Currently, this method is mainly customized to facilitate the thread model on consumer side.
     * 1. Use ThreadlessExecutor, aka., delegate callback directly to the thread initiating the call.
     * 2. Use shared executor to execute the callback.
     *
     * @param msg
     * @return
     */
    public ExecutorService getPreferredExecutorService(Object msg) {
        if (msg instanceof Response) {
            Response response = (Response) msg;
            DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
            // a typical scenario is the response returned after timeout, the timeout response may has completed the future
            if (responseFuture == null) {
                return getSharedExecutorService();
            } else {
                // 取出之前设定的executor
                ExecutorService executor = responseFuture.getExecutor();
                if (executor == null || executor.isShutdown()) {
                    executor = getSharedExecutorService();
                }
                return executor;
            }
        } else {
            return getSharedExecutorService();
        }
    }
    // 这是同步调用时使用到的线程池 ThreadlessExecutor, 接收到数据后不会立即处理
    /**
     * If the calling thread is still waiting for a callback task, add the task into the blocking queue to wait for schedule.
     * Otherwise, submit to shared callback executor directly.
     *
     * @param runnable
     */
    @Override
    public void execute(Runnable runnable) {
        runnable = new RunnableWrapper(runnable);
        synchronized (lock) {
            if (!waiting) {
                sharedExecutor.execute(runnable);
            } 
            // 只要客户端的还没有触发结果检查,那么将放入队列中,即不会主动进行通知结果
            else {
                queue.add(runnable);
            }
        }
    }

  可以看到同步和异步的处理方式区别在于使用不同的线程池实现,异步是直接运行,而同步则做了一次包装,这也为它自定义更合适的处理方式打下了基础。

2.5. 同步请求的结果处理方式

  同步处理时,在上层接口调用也是无感的。但是底层都被包装成了异步调用,所以会在上层api中主动进行结果的等待处理。当然,既然是同步处理,它自然是不会主动设置一个较小的超时的,而是用了一个 Integer.MAX_VALUE 的超时设置,真正的超时是由异步结果处理中抛出。

    // asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
    // org.apache.dubbo.rpc.AsyncRpcResult#get(long, java.util.concurrent.TimeUnit)
    @Override
    public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
            threadlessExecutor.waitAndDrain();
        }
        // 最终直接从指定位置获取结果即可
        return responseFuture.get(timeout, unit);
    }
    // org.apache.dubbo.common.threadpool.ThreadlessExecutor#waitAndDrain()
    /**
     * Waits until there is a task, executes the task and all queued tasks (if there're any). The task is either a normal
     * response or a timeout response.
     */
    public void waitAndDrain() throws InterruptedException {
        /**
         * Usually, {@link #waitAndDrain()} will only get called once. It blocks for the response for the first time,
         * once the response (the task) reached and being executed waitAndDrain will return, the whole request process
         * then finishes. Subsequent calls on {@link #waitAndDrain()} (if there're any) should return immediately.
         *
         * There's no need to worry that {@link #finished} is not thread-safe. Checking and updating of
         * 'finished' only appear in waitAndDrain, since waitAndDrain is binding to one RPC call (one thread), the call
         * of it is totally sequential.
         */
        if (finished) {
            return;
        }

        Runnable runnable;
        try {
            // 如果服务端没有响应,这里是会一直阻塞,因此也达到了同步等待的效果
            runnable = queue.take();
        }catch (InterruptedException e){
            waiting = false;
            throw e;
        }
        // 当拿到结果之后,再运行后续的任务,一般没啥事了,主要就是将结果放置到合适的位置,以后后续可取
        synchronized (lock) {
            waiting = false;
            runnable.run();
        }

        runnable = queue.poll();
        while (runnable != null) {
            runnable.run();
            runnable = queue.poll();
        }
        // mark the status of ThreadlessExecutor as finished.
        finished = true;
    }

  同步只是表象,异步才是核心。

2.6. 异步的处理方式

  异步执行时,使用的就是 ThreadPoolExecutor, 直接进行execute, 即提交到线程池立即执行。即都是统一用共享线程池进行处理,这样做的好处是,不需要等待客户端调用结果,而是主动将结果放置到future的result位置,只需等待处理即可。

    // org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run
    @Override
    public void run() {
        if (state == ChannelState.RECEIVED) {
            try {
                // 直接进入到netty 管道出入站流程,并最终如前面将结果设置到指定位置
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
        } else {
            switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
                break;
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is: " + message + ", exception is " + exception, e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }

  异步处理没啥特别的,直接交由netty的pipeline机制完全处理即可。

2.7. netty相关的一点设置

  NettypClient, 与服务端交互的入口。主要用于开启网络连接,设置各种处理器,总体来说就是netty的编程模型。感兴趣的自行翻阅。

ContractedBlock.gifExpandedBlockStart.gif

View Code

2.8. 放置响应结果

  前面多次提到响应结束后,结果将会被放到合适的位置,我们就简单看下它到底是怎么放置的呢?其实就是 CompletableFuture 的complete方法。

    // 主动置位结果
    private void doReceived(Response res) {
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) {
            // 放置结果后结束
            this.complete(res.getResult());
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
        }

        // the result is returning, but the caller thread may still waiting
        // to avoid endless waiting for whatever reason, notify caller thread to return.
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
            if (threadlessExecutor.isWaiting()) {
                threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
                        " which is not an expected state, interrupt the thread manually by returning an exception."));
            }
        }
    }

  可以看出,同步和异步调用的区别主要是线程池的处理,以级后续事件的触发时机不同。同步调用在框架层面的假设是,发送消息之后,很快就会进行get() 操作,所以此时只需将就绪事件放入队列即可。而异步调用则可能没有后续的用户驱动,所以不能有卡点的出现,所以直接运行相应的结果通知,将结果放置到正确的位置。至于客户端来取或不来取,整体都不景程。

  超时机制,是通过一个定时器,到点检查,检查到即超时。如果结果先出来,那么,主动将定时器取消,一切正常。因为定时器是另外的线程池进行处理,不受当前处理线程的影响,所以可以很好地控制超时。不管是客户端超时,还是服务端超时,都一概处理。

  最后,再说下超时时的消息描述信息,因为这可能给我排查问题带来极大的便利。

2.9. 超时异常信息解析

  判断是客户端超时还是服务端超时,是通过是否将消息发送出去为准的。实际上,它并不能区分出到底是客户端发送得晚了,还是服务端真的处理慢了。也就是说,当客户端自己慢的时候,它很可能认为是服务端超时了。而且,客户端是假设服务端一发送加响应消息,客户端就立即能收到结果,然后就以当时时间来判定服务端的处理时间。然而这样的判断方式,在客户端自身压力很大的情况下,仍然是有失偏颇的。代码描述如下:

    // 错误信息详细描述
    // org.apache.dubbo.remoting.exchange.support.DefaultFuture#getTimeoutMessage
    private String getTimeoutMessage(boolean scan) {
        long nowTimestamp = System.currentTimeMillis();
        return (sent > 0 ? "Waiting server-side response timeout" : "Sending request timeout in client-side")
                + (scan ? " by scan timer" : "") + ". start time: "
                + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(start))) + ", end time: "
                + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(nowTimestamp))) + ","
                + (sent > 0 ? " client elapsed: " + (sent - start)
                + " ms, server elapsed: " + (nowTimestamp - sent)
                : " elapsed: " + (nowTimestamp - start)) + " ms, timeout: "
                + timeout + " ms, request: " + (logger.isDebugEnabled() ? request : getRequestWithoutData()) + ", channel: " + channel.getLocalAddress()
                + " -> " + channel.getRemoteAddress();
    }

3. server端超时实现

  通过上一节,我们可以看到客户端的超时机制比较简单,但是实际上也是非常完善的。那么,对于服务端是否也有同样的一套东西呢?事实上,要控制服务端的超时,难度要比客户端大:一是因为服务端作为服务提供者,应该是要保证服务正常处理,而不是边处理边检查是否超时;二是服务端如果发现了超时,应该怎么对客户端说呢?抛出异常或者不返回消息?客户端因为是终端,他可以忽略结果即可,但服务端这样做却是不太合适的。另外,服务端计算超时的方式是不完善的,因为超时一般是针对客户端而言,因为整体链路除了服务端的处理时间,还有网络传输、处理时间,客户端自行处理的时间等等,所以服务端的超时标准不太可靠。

3.1. 业务处理线程池接入

  当网络数据就绪之后,会将数据提交到业务线程池进行处理,也就是说io线程和业务线程是分离的。这是一般的处理方式,避免阻塞io线程,也方便扩展业务线程。我们理解,其实要做超时,在这个地方是比较合适的。

    // 服务端消息接入
    // org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            // 交由对应的线程池异步处理, 状态为 RECEIVED
            // 此处其实可能存在阻塞等待问题
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    // org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        if (message instanceof Request) {
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    // handleRequest
                    handleRequest(exchangeChannel, request);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    }

    // org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest
    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        // 发生异常情况时,会被取消执行
        if (req.isBroken()) {
            Object data = req.getData();

            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);

            channel.send(res);
            return;
        }
        // find handler by message class.
        Object msg = req.getData();
        try {
            CompletionStage<Object> future = handler.reply(channel, msg);
            // 异步等待结果响应回调
            future.whenComplete((appResult, t) -> {
                try {
                    // 没有异常,就是正常
                    if (t == null) {
                        res.setStatus(Response.OK);
                        res.setResult(appResult);
                    } else {
                        res.setStatus(Response.SERVICE_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                    channel.send(res);
                } catch (RemotingException e) {
                    // 在客户端关闭连接时,发送消息将会失败
                    logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                }
            });
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }

    // org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        try {
            // 调用正常的rpc方法
            Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            CompletableFuture<Object> future = wrapWithFuture(value);
            CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
                AppResponse result = new AppResponse(invocation);
                if (t != null) {
                    if (t instanceof CompletionException) {
                        result.setException(t.getCause());
                    } else {
                        result.setException(t);
                    }
                } else {
                    result.setValue(obj);
                }
                return result;
            });
            // 包装返回结果
            return new AsyncRpcResult(appResponseFuture, invocation);
        } catch (InvocationTargetException e) {
            if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
                logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            }
            return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

  server端确实也使用了一个独立的线程池来处理业务,但是并没有看到相应的外围超时处理。这是比较疑惑的,因为它已经错过了最佳判断超时的时机了。那么,是否服务端就不能提供超时功能了呢?

3.2. server解析timeout信息

  server端仅在特殊情况下才会处理超时。它是在 TimeoutFilter 做的简单处理,仅将结果清空,然后正常返回了。

    // org.apache.dubbo.rpc.protocol.FilterNode#invoke
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult;
        try {
            asyncResult = filter.invoke(next, invocation);
        } catch (Exception e) {
            if (filter instanceof ListenableFilter) {
                ListenableFilter listenableFilter = ((ListenableFilter) filter);
                try {
                    Filter.Listener listener = listenableFilter.listener(invocation);
                    if (listener != null) {
                        listener.onError(e, invoker, invocation);
                    }
                } finally {
                    listenableFilter.removeListener(invocation);
                }
            } else if (filter instanceof Filter.Listener) {
                Filter.Listener listener = (Filter.Listener) filter;
                listener.onError(e, invoker, invocation);
            }
            throw e;
        } finally {

        }
        return asyncResult.whenCompleteWithContext((r, t) -> {
            if (filter instanceof ListenableFilter) {
                ListenableFilter listenableFilter = ((ListenableFilter) filter);
                Filter.Listener listener = listenableFilter.listener(invocation);
                try {
                    if (listener != null) {
                        if (t == null) {
                            listener.onResponse(r, invoker, invocation);
                        } else {
                            listener.onError(t, invoker, invocation);
                        }
                    }
                } finally {
                    listenableFilter.removeListener(invocation);
                }
            } else if (filter instanceof Filter.Listener) {
                Filter.Listener listener = (Filter.Listener) filter;
                if (t == null) {
                    listener.onResponse(r, invoker, invocation);
                } else {
                    listener.onError(t, invoker, invocation);
                }
            }
        });
    }

    // org.apache.dubbo.rpc.filter.ContextFilter#invoke
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Map<String, Object> attachments = invocation.getObjectAttachments();
        if (attachments != null) {
            Map<String, Object> newAttach = new HashMap<>(attachments.size());
            for (Map.Entry<String, Object> entry : attachments.entrySet()) {
                String key = entry.getKey();
                if (!UNLOADING_KEYS.contains(key)) {
                    newAttach.put(key, entry.getValue());
                }
            }
            attachments = newAttach;
        }

        RpcContext context = RpcContext.getContext();
        context.setInvoker(invoker)
                .setInvocation(invocation)
//                .setAttachments(attachments)  // merged from dubbox
                .setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
        String remoteApplication = (String) invocation.getAttachment(REMOTE_APPLICATION_KEY);
        if (StringUtils.isNotEmpty(remoteApplication)) {
            context.setRemoteApplicationName(remoteApplication);
        } else {
            context.setRemoteApplicationName((String) context.getAttachment(REMOTE_APPLICATION_KEY));
        }
        // 此处为服务端的超时实现,通过 _TO:xx 配置,由客户端传导到服务端进行控制,当超时时,结果将被清空
        // 即此处的超时是伪超时,客户端实现的超时才是真实的
        long timeout = RpcUtils.getTimeout(invocation, -1);
        if (timeout != -1) {
            context.set(TIME_COUNTDOWN_KEY, TimeoutCountDown.newCountDown(timeout, TimeUnit.MILLISECONDS));
        }

        // merged from dubbox
        // we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
        if (attachments != null) {
            if (context.getObjectAttachments() != null) {
                context.getObjectAttachments().putAll(attachments);
            } else {
                context.setObjectAttachments(attachments);
            }
        }

        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }

        try {
            context.clearAfterEachInvoke(false);
            return invoker.invoke(invocation);
        } finally {
            context.clearAfterEachInvoke(true);
            // IMPORTANT! For async scenario, we must remove context from current thread, so we always create a new RpcContext for the next invoke for the same thread.
            RpcContext.removeContext(true);
            RpcContext.removeServerContext();
        }
    }
    
    // org.apache.dubbo.rpc.filter.TimeoutFilter#invoke
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        return invoker.invoke(invocation);
    }
    // TimeoutFilter
    @Override
    public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
        // "timeout-countdown"
        Object obj = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);
        if (obj != null) {
            // 超时后,将结果清空
            TimeoutCountDown countDown = (TimeoutCountDown) obj;
            if (countDown.isExpired()) {
                ((AppResponse) appResponse).clear(); // clear response in case of timeout.
                if (logger.isWarnEnabled()) {
                    logger.warn("invoke timed out. method: " + invocation.getMethodName() + " arguments: " +
                            Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() +
                            ", invoke elapsed " + countDown.elapsedMillis() + " ms.");
                }
            }
        }
    }

  服务端的超时控制,并非像客户端那样,可以直接断开服务,或者丢弃连接。而是需要谨慎处理,此处为清空结果。这也许不是大家想要的超时。

3.3. 服务端server的开启过程

  服务端作为提供者,会将自己所有的必要的服务注册到注册中心,所以在在启动时会使用netty服务框架,打开网络端口。这个过程是在导出第一个service的时候进行的。感兴趣的自行翻阅。

ContractedBlock.gifExpandedBlockStart.gif

View Code

  虽然本节是讲server端的超时控制的,但是很明显这方便的讲述也很少,原因是本来就没打算在server端实现超时。我们要做的,也许只是验证一下而已。

  dubbo中的超时设置,可以在服务端、消费端,而且官方建议是设置在服务端,客户端做特殊处理即可。原因是服务端更清楚接口的性能情况。这是完全理解的。但它会给人一种感觉,好像是真的服务端真的实现了超时处理。然而实际情况却是,它仅将该参数传导到客户端,然后由客户端来控制了。这倒是和直觉不太一样,但是谁能说他的直觉就是对的呢。

  其实要想实现真正的服务端超时,也是可以的。同样,它也需要借助一些额外的线程池。比如,在接收其实要想实现真正的服务端超时,也是可以的。同样,它也需要借助一些额外的线程池。比如,在接收完数据之后,需要添加到业务线程池中进行处理,此时在提交之前写入一个开始时间,然后在线程池真正处理的时候,与当前时间运算,超时后就不再进行后续的计算逻辑,而是直接响应客户端超时。这个思路很简单,但至于为什么没有被实现,也许会有其他的考量,我们只讨论局部思路了。

注:本篇使用dubbo版本为 2.7.0 


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK