16

0318 guava并发工具

 4 years ago
source link: https://segmentfault.com/a/1190000022069911
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

bYBZrur.png!web

并发是一个难题,但是可以通过使用强力简单的抽象来显著的简化,为了简化问题,guava扩展了Future接口,即 ListenableFuture (可以监听的Future)。

我强烈建议你在你的所有代码里使用ListenableFuture去替代Future,原因如下:

  • 很多的Futures 类的方法需要它。(Futures工具类使用)
  • 它比后来改造为ListenableFutrue更简单。(早点使用比重构更简单)
  • 工具方法的提供者不需要提供Future和ListenableFuture方法的变体。(不需要兼容两套)

接口

一个传统的Futrue代表一个异步计算的结果:一个可能完成也可能没有完成输出结果的计算。

一个Future可以用在进度计算,或者说是 一个提供给我们结果的服务的承诺。

一个ListenableFuture允许注册当你在计算完成的时候的回调,或者计算已经完成了。

这个简单的增强让高效支持多种操作成为可能。而Future接口并不能支持。

ListenbleFuture中添加的基本操作是

addListener(Runnable , Executor ),

它指出了当未来计算完成时,指定的Runnable会在指定的Executor中运行。

增加回调

很多用户喜欢使用 Futures.addCallback(ListenableFuture,FutureCallback,Executor)方法。

FutureCallback实现了下面两个方法:

  • onSuccess(v) 当未来成功执行的动作,基于计算结果
  • onFailure(Throwable) 当未来失败执行的动作,基于失败

创建

相较于jdk提供的 ExecutorService.submit(Callable)方法来初始化一个异步计算。它返回一个常规的Future,

guava提供了ListeningExecutorService接口,它返回ListenableFuture。

把ExecutorService转换为ListenableExecutorService

使用:MoreExecutors.listeningDecorator(ExecutorService)

基础用法如下:

/**
 * 说明:使用例子代码
 * @author carter
 * 创建时间: 2020年03月19日 9:54 上午
 **/

@Slf4j
public class ListenableFutureUtils {

    public static void main(String[] args) {

ListeningExecutorService service = MoreExecutors.listeningDecorator(
    Executors.newFixedThreadPool(10));


        final ListenableFuture<AResult> listenableFuture = service.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new AResult(30, "male", 1);

        });


        Futures.addCallback(listenableFuture,
                new FutureCallback<AResult>() {
                    @Override
                    public void onSuccess(AResult aResult) {
                        log.info("计算成功,{}",aResult);
                    }

                    @Override
                    public void onFailure(Throwable throwable) {

                        log.error("计算错误",throwable);
                        
                    }
                },service);

    }
    
    @Data
    @AllArgsConstructor
    public static class AResult{
        
        private Integer age;
        
        private String sex;
        
        private Integer id;
        
        
    }
    
}

相对的,如果你想从基于FutureTask的API转换过来,

Guava提供了

ListenableFutureTask.create(Callable)

ListenableFutureTask.create(Runnable)

不同于jdk,ListenableFutureTask并不是直接扩展的。

如果你喜欢抽象的设置future的值,而不是实现一个方法然后计算值,可以考虑使用AbstractFuture或使用SettableFuture ;

如果你必须转换Future为ListenableFuture,你别无选择,必须使用 JdkFutureAdapters.listenInPoolThread(Future)来转换Future为ListenableFuture

任何时候只要可能,推荐你修改源码让它返回一个 ListenableFuture

应用

使用ListenablFuture最重要的原因是可以使用链式异步操作。

代码如下:

package com.xxx.demo;

import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.AllArgsConstructor;
import lombok.Data;

/**
 * 说明:异步操作链
 * @author carter
 * 创建时间: 2020年03月19日 10:11 上午
 **/

public class ApplicationUtils {


    public static void main(String[] args) {

        Query query = new Query(30);

        ListenableFuture<RowKey> rowKeyFuture = lookUp(query);

        AsyncFunction<RowKey, QueryResult> queryFun = rowKey -> readData(rowKey);

        final ListenableFuture<QueryResult> queryResultListenableFuture = 
            Futures.transformAsync(rowKeyFuture, queryFun);

    }

    private static ListenableFuture<QueryResult> readData(RowKey rowKey) {
        return null;
    }

    private static ListenableFuture<RowKey> lookUp(Query query) {
        return null;
    }


    @Data
    @AllArgsConstructor
    public static class RowKey {

        private String id;

    }

    @Data
    @AllArgsConstructor
    public static class Query {

        private Integer age;

    }


    @Data
    @AllArgsConstructor
    public static class QueryResult {

        private String id;
        private String age;

    }


}

很多其他高效支持的操作ListenableFuture提供,而Future不提供。

不同的操作可以被不同的线程池执行,一个简单的ListenableFuture可以有多个操作去等待。

只要一个操作开始,其他多个操作应该开始,fan-out, 千帆竞发。

ListenableFuture可以实现这样的操作:它触发了所有请求的回调。

通过少量的工作,我们可以 fan-in.

触发一个ListenableFuture 来获得计算结果,当其他的Future结束的时候。

Futures.allAsList是一个例子。

方法介绍:

方法 描述 transformAsync(ListenableFuture , AsyncFunction , Executor) 返回一个新的ListenableFuture,它的结果是执行异步函数的返回,函数入参是ListenableFuture的返回结果; transform(ListenableFuture , Function , Executor) 返回一个新的ListenableFuture,它的结果是执行函数的返回,函数入参是ListenableFuture的返回结果; allAsList(Iterable<ListenableFuture>) 返回一个ListenableFuture,它的结果是一个list,包含每一个列表中的ListenableFuture的执行结果,任何一个ListenableFuture执行失败或者取消,最后的返回结果取消 successfullAsList(Iterable<ListenableFuture>) 返回一个ListenableFuture,它的结果是一个list,包含每一个列表中的ListenableFuture的执行结果,成功的是结果,失败或者取消的值使用null替代

AsyncFunction<A,B> 提供了一个方法 , ListenableFuture apply(A inpunt),它可以用来异步的转换值。

代码如下:

package com.xxx.demo;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
 * 说明:成功执行结果汇集
 * @author carter
 * 创建时间: 2020年03月19日 10:34 上午
 **/
@Slf4j
public class Test3 {

    public static void main(String[] args) {

        List<ListenableFuture<QueryResult>> querys = Lists.newLinkedList();
        final ListenableFuture<List<QueryResult>> successfulAsList =
            Futures.successfulAsList(querys);
        
        Futures.addCallback(successfulAsList, new FutureCallback<List<QueryResult>>() {
            @Override
            public void onSuccess(List<QueryResult> queryResults) {
                log.info("执行结果列表:{}",queryResults);
            }

            @Override
            public void onFailure(Throwable throwable) {
                log.error("执行失败",throwable);
            }
        });


    }

    @Data
    @AllArgsConstructor
    public static class QueryResult{
        
        
      private  Integer age;
        
    }
    

}

嵌套的Future

你的代码调用一个通用接口并返回一个Future,很可能最终返回一个嵌套的Future.

package com.xxx.demo;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

/**
 * 说明:嵌套的ListenableFuture
 * @author carter
 * 创建时间: 2020年03月19日 10:43 上午
 **/

public class Test4 {

    public static void main(String[] args) {


        final ListeningExecutorService executorService = MoreExecutors
            .listeningDecorator(Executors.newFixedThreadPool(2));
        final ListeningExecutorService otherExecutorService = MoreExecutors
            .listeningDecorator(Executors.newFixedThreadPool(2));


        Callable<Foo> otherCallback =  ()->new Foo("aaa");


        final ListenableFuture<ListenableFuture<Foo>> submit = 
                executorService.submit(() -> otherExecutorService.submit(otherCallback));


    }
    
    @Data
    @AllArgsConstructor
    public static class Foo{
        
        private String name;
    }
    
}

例子最后返回的是: ListenableFuture<ListenableFuture<Foo>> ,

这个代码不对,因为当外层的Future 取消的时候,无法传播到内层的Future,

这也是一个 使用get()检查别的Future或者Listnener的常规的错误,

但是,除非特别关注 否则 otherCallback抛出的异常会被压制。

为了避免这种情况,所有的guava的Future处理方法(有些从jdk来),有 *Async版本来安全的解开这个嵌套。

比如:transform,transformAsyn, submit, submitAsync方法。

深入研究

26VV7zb.png!web

原创不易,转载请注明出处,欢迎沟通交流。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK