dubbo(九):timeout超时机制解析 - 等你归去来

 2 years ago
source link: https://www.cnblogs.com/yougewe/p/16460724.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client



1. 超时机制的实现思路





2. 客户端实现超时


2.1. 客户端使用超时的方式

  首先,dubbo的调置超时方式,按照其整体架构设计理念,都有几个作用域:应用级 -> 接口级 -> 方法级。 consumer端 -> provider端。

// 消费者端特定方法的配置
<dubbo:consumer interface="com.alibaba.xxx.XxxService" >
    <dubbo:method name="findPerson" timeout="1000" />
// 消费者端特定接口的配置
<dubbo:consumer interface="com.alibaba.xxx.XxxService" timeout="200" />
// 提供者端特定方法的配置
<dubbo:service interface="com.alibaba.xxx.XxxService" >
    <dubbo:method name="findPerson" timeout="1000" />
// 提供者端特定接口的配置
<dubbo:service interface="com.alibaba.xxx.XxxService" timeout="200" />


2.2. 超时参数的读取与使用


    // org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker#invoke
    public Result invoke(Invocation invocation) throws RpcException {
        // 同步和异步,底层都是异步请求,仅做上层封装
        Result asyncResult = invoker.invoke(invocation);

        try {
            // 同步请求时,在内部等待
            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                 * NOTICE!
                 * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
                 * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new RpcException("Interrupted unexpectedly while waiting for remote result to return!  method: " +
                    invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (ExecutionException e) {
            Throwable t = e.getCause();
            // 超时返回,给出详细堆栈
            if (t instanceof TimeoutException) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } else if (t instanceof RemotingException) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } else {
                throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (Throwable e) {
            throw new RpcException(e.getMessage(), e);
        return asyncResult;

    // org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke
    public Result invoke(Invocation inv) throws RpcException {
        // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
        if (destroyed.get()) {
            logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
        RpcInvocation invocation = (RpcInvocation) inv;
        if (CollectionUtils.isNotEmptyMap(attachment)) {

        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
             * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
             * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
             * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
             * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).

        invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION));
        if (serializationId != null) {
            invocation.put(SERIALIZATION_ID_KEY, serializationId);

        AsyncRpcResult asyncResult;
        try {
            // 调用远程方法
            asyncResult = (AsyncRpcResult) doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            Throwable te = e.getTargetException();
            if (te == null) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
        } catch (RpcException e) {
            if (e.isBiz()) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                throw e;
        } catch (Throwable e) {
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
        return asyncResult;

    // org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            // 获取超时设置
            int timeout = calculateTimeout(invocation, methodName);
            invocation.put(TIMEOUT_KEY, timeout);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                // 响应结果回调,使用线程池接收
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                // 向服务端发送请求,并返回 future 作为结果接收器
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                return result;
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    // 超时配置读取,多种方式,多种优先级
    private int calculateTimeout(Invocation invocation, String methodName) {
        // timeout-countdown, 需要传导到服务端的超时控制
        Object countdown = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);
        // 默认1s超时
        int timeout = DEFAULT_TIMEOUT;
        if (countdown == null) {
            timeout = (int) RpcUtils.getTimeout(getUrl(), methodName, RpcContext.getContext(), DEFAULT_TIMEOUT);
            if (getUrl().getParameter(ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
                invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout); // pass timeout to remote server
        } else {
            TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
            timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
            invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);// pass timeout to remote server
        return timeout;
    // org.apache.dubbo.rpc.support.RpcUtils#getTimeout
    public static long getTimeout(URL url, String methodName, RpcContext context, long defaultTimeout) {
        long timeout = defaultTimeout;
        // 先方法,后接口
        // 事实上,所有接口级的变量在注册的时候已经作用到了方法级上了,所以只需读取方法上的参数即可
        Object genericTimeout = context.getObjectAttachment(TIMEOUT_KEY);
        if (genericTimeout != null) {
            timeout = convertToNumber(genericTimeout, defaultTimeout);
        } else if (url != null) {
            timeout = url.getMethodPositiveParameter(methodName, TIMEOUT_KEY, defaultTimeout);
        return timeout;
    // org.apache.dubbo.common.URL#getMethodPositiveParameter(java.lang.String, java.lang.String, long)
    public long getMethodPositiveParameter(String method, String key, long defaultValue) {
        if (defaultValue <= 0) {
            throw new IllegalArgumentException("defaultValue <= 0");
        long value = getMethodParameter(method, key, defaultValue);
        return value <= 0 ? defaultValue : value;

    public long getMethodPositiveParameter(String method, String key, long defaultValue) {
        if (defaultValue <= 0) {
            throw new IllegalArgumentException("defaultValue <= 0");
        long value = getMethodParameter(method, key, defaultValue);
        return value <= 0 ? defaultValue : value;
    // org.apache.dubbo.common.URL#getMethodParameter(java.lang.String, java.lang.String, long)
    public long getMethodParameter(String method, String key, long defaultValue) {
        Number n = getCachedNumber(method, key);
        if (n != null) {
            return n.longValue();
        String value = getMethodParameter(method, key);
        if (StringUtils.isEmpty(value)) {
            return defaultValue;
        long l = Long.parseLong(value);
        updateCachedNumber(method, key, l);
        return l;
    // org.apache.dubbo.rpc.protocol.AbstractInvoker#getCallbackExecutor
    protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
        ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
        if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
            // 同步请求使用少量的共享线程池,实际上是做进一步封装处理
            return new ThreadlessExecutor(sharedExecutor);
        } else {
            // 异步调用则直接使用共享线程池,不受其他节点控制
            return sharedExecutor;

  从上面可以看出,dubbo的超时机制是通过异步线程future的方式实现的,其中,同步调用的超时设置,底层也是用异步实现。这样既简化了底层实现,也对外提供了很好的易用性。因为底层都是通过netty或nio实现网络通信,而这种实现一般又是select-poll 模型或者 epoll 模型,反正也必须要用异步处理,所以不管如何也是跑不掉这个实现。只要实现好一个底层异步通知,全部基石就都好了。而上层,则只需关注是用户实现,还是框架实现了。

2.3. 客户端超时监控处理

  上面的实现,我们并没有看到具体是如何实现超时的,毕竟我们只是看到了表面现象,即只是设置了一个 timeout参数,而已。更深层次的实现,请继续。也就是说dubbo是在做请求的同时,做了超时的设置工作。

    // org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null,
                    "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        // create request.
        Request req = new Request();
        // 里面包含了一个超时任务 timeTask
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
        } catch (RemotingException e) {
            throw e;
        return future;
    // org.apache.dubbo.remoting.exchange.support.DefaultFuture#newFuture
     * init a DefaultFuture
     * 1.init a DefaultFuture
     * 2.timeout check
     * @param channel channel
     * @param request the request
     * @param timeout timeout
     * @return a new DefaultFuture
    public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        // ThreadlessExecutor needs to hold the waiting future in case of circuit return.
        if (executor instanceof ThreadlessExecutor) {
            ((ThreadlessExecutor) executor).setWaitingFuture(future);
        // timeout check
        return future;
    // org.apache.dubbo.remoting.exchange.support.DefaultFuture#timeoutCheck
     * check time out of the future
    private static void timeoutCheck(DefaultFuture future) {
        TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
        // 添加一个定时器
        future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
    // org.apache.dubbo.common.timer.HashedWheelTimer#newTimeout
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        if (unit == null) {
            throw new NullPointerException("unit");

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            throw new RejectedExecutionException("Number of pending timeouts ("
                    + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                    + "timeouts (" + maxPendingTimeouts + ")");


        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        return timeout;

        // org.apache.dubbo.remoting.exchange.support.DefaultFuture.TimeoutCheckTask#TimeoutCheckTask
        TimeoutCheckTask(Long requestID) {
            this.requestID = requestID;
        public void run(Timeout timeout) {
            DefaultFuture future = DefaultFuture.getFuture(requestID);
            if (future == null || future.isDone()) {

            if (future.getExecutor() != null) {
                future.getExecutor().execute(() -> notifyTimeout(future));
            } else {
        // org.apache.dubbo.remoting.exchange.support.DefaultFuture.TimeoutCheckTask#notifyTimeout
        private void notifyTimeout(DefaultFuture future) {
            // create exception response.
            Response timeoutResponse = new Response(future.getId());
            // set timeout status.
            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
            // handle response.
            DefaultFuture.received(future.getChannel(), timeoutResponse, true);
    // org.apache.dubbo.remoting.exchange.support.DefaultFuture#received
    public static void received(Channel channel, Response response, boolean timeout) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                Timeout t = future.timeoutCheckTask;
                if (!timeout) {
                    // decrease Time
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response status is " + response.getStatus()
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
        } finally {
    // 抛出异常消息
    private void doReceived(Response res) {
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        if (res.getStatus() == Response.OK) {
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            // 封装为 TimeoutException
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));

        // the result is returning, but the caller thread may still waiting
        // to avoid endless waiting for whatever reason, notify caller thread to return.
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
            if (threadlessExecutor.isWaiting()) {
                threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
                        " which is not an expected state, interrupt the thread manually by returning an exception."));



2.4. 异步处理结果的超时处理



    // org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
    public void received(Channel channel, Object message) throws RemotingException {
        // 根据requestId, 取出之前设定的executor, 提交给业务线程池调用
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            // 将消息封装成 ChannelEventRunnable, 交由后续处理
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
     * Currently, this method is mainly customized to facilitate the thread model on consumer side.
     * 1. Use ThreadlessExecutor, aka., delegate callback directly to the thread initiating the call.
     * 2. Use shared executor to execute the callback.
     * @param msg
     * @return
    public ExecutorService getPreferredExecutorService(Object msg) {
        if (msg instanceof Response) {
            Response response = (Response) msg;
            DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
            // a typical scenario is the response returned after timeout, the timeout response may has completed the future
            if (responseFuture == null) {
                return getSharedExecutorService();
            } else {
                // 取出之前设定的executor
                ExecutorService executor = responseFuture.getExecutor();
                if (executor == null || executor.isShutdown()) {
                    executor = getSharedExecutorService();
                return executor;
        } else {
            return getSharedExecutorService();
    // 这是同步调用时使用到的线程池 ThreadlessExecutor, 接收到数据后不会立即处理
     * If the calling thread is still waiting for a callback task, add the task into the blocking queue to wait for schedule.
     * Otherwise, submit to shared callback executor directly.
     * @param runnable
    public void execute(Runnable runnable) {
        runnable = new RunnableWrapper(runnable);
        synchronized (lock) {
            if (!waiting) {
            // 只要客户端的还没有触发结果检查,那么将放入队列中,即不会主动进行通知结果
            else {


2.5. 同步请求的结果处理方式

  同步处理时,在上层接口调用也是无感的。但是底层都被包装成了异步调用,所以会在上层api中主动进行结果的等待处理。当然,既然是同步处理,它自然是不会主动设置一个较小的超时的,而是用了一个 Integer.MAX_VALUE 的超时设置,真正的超时是由异步结果处理中抛出。

    // asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
    // org.apache.dubbo.rpc.AsyncRpcResult#get(long, java.util.concurrent.TimeUnit)
    public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
        // 最终直接从指定位置获取结果即可
        return responseFuture.get(timeout, unit);
    // org.apache.dubbo.common.threadpool.ThreadlessExecutor#waitAndDrain()
     * Waits until there is a task, executes the task and all queued tasks (if there're any). The task is either a normal
     * response or a timeout response.
    public void waitAndDrain() throws InterruptedException {
         * Usually, {@link #waitAndDrain()} will only get called once. It blocks for the response for the first time,
         * once the response (the task) reached and being executed waitAndDrain will return, the whole request process
         * then finishes. Subsequent calls on {@link #waitAndDrain()} (if there're any) should return immediately.
         * There's no need to worry that {@link #finished} is not thread-safe. Checking and updating of
         * 'finished' only appear in waitAndDrain, since waitAndDrain is binding to one RPC call (one thread), the call
         * of it is totally sequential.
        if (finished) {

        Runnable runnable;
        try {
            // 如果服务端没有响应,这里是会一直阻塞,因此也达到了同步等待的效果
            runnable = queue.take();
        }catch (InterruptedException e){
            waiting = false;
            throw e;
        // 当拿到结果之后,再运行后续的任务,一般没啥事了,主要就是将结果放置到合适的位置,以后后续可取
        synchronized (lock) {
            waiting = false;

        runnable = queue.poll();
        while (runnable != null) {
            runnable = queue.poll();
        // mark the status of ThreadlessExecutor as finished.
        finished = true;


2.6. 异步的处理方式

  异步执行时,使用的就是 ThreadPoolExecutor, 直接进行execute, 即提交到线程池立即执行。即都是统一用共享线程池进行处理,这样做的好处是,不需要等待客户端调用结果,而是主动将结果放置到future的result位置,只需等待处理即可。

    // org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run
    public void run() {
        if (state == ChannelState.RECEIVED) {
            try {
                // 直接进入到netty 管道出入站流程,并最终如前面将结果设置到指定位置
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
        } else {
            switch (state) {
            case CONNECTED:
                try {
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            case DISCONNECTED:
                try {
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            case SENT:
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is: " + message + ", exception is " + exception, e);
                logger.warn("unknown state: " + state + ", message is " + message);



2.7. netty相关的一点设置

  NettypClient, 与服务端交互的入口。主要用于开启网络连接,设置各种处理器,总体来说就是netty的编程模型。感兴趣的自行翻阅。


View Code

2.8. 放置响应结果

  前面多次提到响应结束后,结果将会被放到合适的位置,我们就简单看下它到底是怎么放置的呢?其实就是 CompletableFuture 的complete方法。

    // 主动置位结果
    private void doReceived(Response res) {
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        if (res.getStatus() == Response.OK) {
            // 放置结果后结束
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));

        // the result is returning, but the caller thread may still waiting
        // to avoid endless waiting for whatever reason, notify caller thread to return.
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
            if (threadlessExecutor.isWaiting()) {
                threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
                        " which is not an expected state, interrupt the thread manually by returning an exception."));

  可以看出,同步和异步调用的区别主要是线程池的处理,以级后续事件的触发时机不同。同步调用在框架层面的假设是,发送消息之后,很快就会进行get() 操作,所以此时只需将就绪事件放入队列即可。而异步调用则可能没有后续的用户驱动,所以不能有卡点的出现,所以直接运行相应的结果通知,将结果放置到正确的位置。至于客户端来取或不来取,整体都不景程。



2.9. 超时异常信息解析


    // 错误信息详细描述
    // org.apache.dubbo.remoting.exchange.support.DefaultFuture#getTimeoutMessage
    private String getTimeoutMessage(boolean scan) {
        long nowTimestamp = System.currentTimeMillis();
        return (sent > 0 ? "Waiting server-side response timeout" : "Sending request timeout in client-side")
                + (scan ? " by scan timer" : "") + ". start time: "
                + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(start))) + ", end time: "
                + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(nowTimestamp))) + ","
                + (sent > 0 ? " client elapsed: " + (sent - start)
                + " ms, server elapsed: " + (nowTimestamp - sent)
                : " elapsed: " + (nowTimestamp - start)) + " ms, timeout: "
                + timeout + " ms, request: " + (logger.isDebugEnabled() ? request : getRequestWithoutData()) + ", channel: " + channel.getLocalAddress()
                + " -> " + channel.getRemoteAddress();

3. server端超时实现


3.1. 业务处理线程池接入


    // 服务端消息接入
    // org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            // 交由对应的线程池异步处理, 状态为 RECEIVED
            // 此处其实可能存在阻塞等待问题
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);

    // org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
    public void received(Channel channel, Object message) throws RemotingException {
        final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        if (message instanceof Request) {
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    // handleRequest
                    handleRequest(exchangeChannel, request);
                } else {
                    handler.received(exchangeChannel, request.getData());
        } else if (message instanceof Response) {
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
        } else {
            handler.received(exchangeChannel, message);

    // org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest
    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        // 发生异常情况时,会被取消执行
        if (req.isBroken()) {
            Object data = req.getData();

            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            res.setErrorMessage("Fail to decode request due to: " + msg);

        // find handler by message class.
        Object msg = req.getData();
        try {
            CompletionStage<Object> future = handler.reply(channel, msg);
            // 异步等待结果响应回调
            future.whenComplete((appResult, t) -> {
                try {
                    // 没有异常,就是正常
                    if (t == null) {
                    } else {
                } catch (RemotingException e) {
                    // 在客户端关闭连接时,发送消息将会失败
                    logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
        } catch (Throwable e) {

    // org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke
    public Result invoke(Invocation invocation) throws RpcException {
        try {
            // 调用正常的rpc方法
            Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            CompletableFuture<Object> future = wrapWithFuture(value);
            CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
                AppResponse result = new AppResponse(invocation);
                if (t != null) {
                    if (t instanceof CompletionException) {
                    } else {
                } else {
                return result;
            // 包装返回结果
            return new AsyncRpcResult(appResponseFuture, invocation);
        } catch (InvocationTargetException e) {
            if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
                logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);


3.2. server解析timeout信息

  server端仅在特殊情况下才会处理超时。它是在 TimeoutFilter 做的简单处理,仅将结果清空,然后正常返回了。

    // org.apache.dubbo.rpc.protocol.FilterNode#invoke
    public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult;
        try {
            asyncResult = filter.invoke(next, invocation);
        } catch (Exception e) {
            if (filter instanceof ListenableFilter) {
                ListenableFilter listenableFilter = ((ListenableFilter) filter);
                try {
                    Filter.Listener listener = listenableFilter.listener(invocation);
                    if (listener != null) {
                        listener.onError(e, invoker, invocation);
                } finally {
            } else if (filter instanceof Filter.Listener) {
                Filter.Listener listener = (Filter.Listener) filter;
                listener.onError(e, invoker, invocation);
            throw e;
        } finally {

        return asyncResult.whenCompleteWithContext((r, t) -> {
            if (filter instanceof ListenableFilter) {
                ListenableFilter listenableFilter = ((ListenableFilter) filter);
                Filter.Listener listener = listenableFilter.listener(invocation);
                try {
                    if (listener != null) {
                        if (t == null) {
                            listener.onResponse(r, invoker, invocation);
                        } else {
                            listener.onError(t, invoker, invocation);
                } finally {
            } else if (filter instanceof Filter.Listener) {
                Filter.Listener listener = (Filter.Listener) filter;
                if (t == null) {
                    listener.onResponse(r, invoker, invocation);
                } else {
                    listener.onError(t, invoker, invocation);

    // org.apache.dubbo.rpc.filter.ContextFilter#invoke
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Map<String, Object> attachments = invocation.getObjectAttachments();
        if (attachments != null) {
            Map<String, Object> newAttach = new HashMap<>(attachments.size());
            for (Map.Entry<String, Object> entry : attachments.entrySet()) {
                String key = entry.getKey();
                if (!UNLOADING_KEYS.contains(key)) {
                    newAttach.put(key, entry.getValue());
            attachments = newAttach;

        RpcContext context = RpcContext.getContext();
//                .setAttachments(attachments)  // merged from dubbox
                .setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
        String remoteApplication = (String) invocation.getAttachment(REMOTE_APPLICATION_KEY);
        if (StringUtils.isNotEmpty(remoteApplication)) {
        } else {
            context.setRemoteApplicationName((String) context.getAttachment(REMOTE_APPLICATION_KEY));
        // 此处为服务端的超时实现,通过 _TO:xx 配置,由客户端传导到服务端进行控制,当超时时,结果将被清空
        // 即此处的超时是伪超时,客户端实现的超时才是真实的
        long timeout = RpcUtils.getTimeout(invocation, -1);
        if (timeout != -1) {
            context.set(TIME_COUNTDOWN_KEY, TimeoutCountDown.newCountDown(timeout, TimeUnit.MILLISECONDS));

        // merged from dubbox
        // we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
        if (attachments != null) {
            if (context.getObjectAttachments() != null) {
            } else {

        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);

        try {
            return invoker.invoke(invocation);
        } finally {
            // IMPORTANT! For async scenario, we must remove context from current thread, so we always create a new RpcContext for the next invoke for the same thread.
    // org.apache.dubbo.rpc.filter.TimeoutFilter#invoke
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        return invoker.invoke(invocation);
    // TimeoutFilter
    public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
        // "timeout-countdown"
        Object obj = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);
        if (obj != null) {
            // 超时后,将结果清空
            TimeoutCountDown countDown = (TimeoutCountDown) obj;
            if (countDown.isExpired()) {
                ((AppResponse) appResponse).clear(); // clear response in case of timeout.
                if (logger.isWarnEnabled()) {
                    logger.warn("invoke timed out. method: " + invocation.getMethodName() + " arguments: " +
                            Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() +
                            ", invoke elapsed " + countDown.elapsedMillis() + " ms.");


3.3. 服务端server的开启过程



View Code




注:本篇使用dubbo版本为 2.7.0 

About Joyk

Aggregate valuable and interesting links.
Joyk means Joy of geeK