2

CompletableFuture和Future介绍

 2 years ago
source link: https://perkins4j2.github.io/posts/13999/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

CompletableFuture和Future介绍

发表于

2020-05-19

|

更新于 2020-06-08

| 分类于 架构

| 阅读次数:

本文字数: 5.6k

|

阅读时长 ≈ 5 分钟

Future是JDK1.5后提供的异步调用接口。
CompletableFuture是JDK8提出的一个支持非阻塞的多功能的异步类,实现了Future。

与Future区别

  • CompletableFuture支持回调,可指定线程池
  • CompletableFuture支持链式、顺序和聚合等执行
  • CompletableFuture支持不阻塞主线程且无需轮询等待状态
  • 都是提交后直接执行,并在get时阻塞等待

CompletableFuture主要方法

  • thenRun,前一个执行后再同步执行
  • thenRunAsync,前一个执行后再异步执行
  • runAsync,无返回值的异步执行
  • supplyAsync,有返回值的异步执行
  • allOf,聚合多个调用
  • get,阻塞等待执行完成

CompletableFuture异步执行Demo

public class CompletableFutureTest {
public static void main(String[] args) {
//线程池
Executor executor = Executors.newFixedThreadPool(10);

//异步列表
List<CompletableFuture<Boolean>> completableFutureList = new ArrayList<>();

//输入列表
List<Integer> sceneList = new ArrayList<>();
sceneList.add(1);
sceneList.add(2);
sceneList.add(3);

sceneList.forEach(e -> {
//异步
CompletableFuture<Boolean> completableFuture =
CompletableFuture.supplyAsync(() -> execute(e), executor);
completableFutureList.add(completableFuture);
});

try {
//聚合阻塞
CompletableFuture
.allOf(completableFutureList.toArray(new CompletableFuture[0]))
.get(1, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}

private static boolean execute(Integer number) {
return number > 0;
}
}

Future 阻塞Demo

while (true) {
//CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
if (future.isDone()&& !future.isCancelled()) {
//获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。
Integer i = future.get();//获取结果
System.out.println("任务i="+i+"获取完成!"+new Date());
list.add(i);
break;//当前future获取结果完毕,跳出while
} else {
Thread.sleep(1);
//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
}
}

CompletableFuture-allOf问题

allOf等待所有任务执行完成,直到timeout超时。

CompletableFuture
.allOf(completableFutureList.toArray(new CompletableFuture[0]))
.get(timeout, TimeUnit.MILLISECONDS);

若timeout超时后,所有任务均返回失败;类似事务操作,导致已获取结果的任务后续处理异常,无法容错。

正确使用方法

allOf不限制等待时间,子任务内部限定。具体操作如下:

@Slf4j
public class CompletableFutureUtil {

private List<CompletableFuture<Void>> completableFutureList = new ArrayList<>(1000);
public static ScheduledExecutorService es = Executors.newScheduledThreadPool(20);

public static CompletableFutureUtil newInstance() {
return new CompletableFutureUtil();
}

public CompletableFutureUtil runAsync(Runnable runnable, int timeout) {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(runnable, ThreadPool.POOL);
completableFutureList.add(timeoutAfter(timeout, completableFuture));
return this;
}

public CompletableFutureUtil runAsync(Runnable runnable) {
completableFutureList.add(CompletableFuture.runAsync(runnable, ThreadPool.POOL));
return this;
}

public void finish(int timeout) {
try {
CompletableFuture
.allOf(completableFutureList.toArray(new CompletableFuture[0]))
.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
log.warn("timeout,caller:{},size:{}", getMethod(), completableFutureList.size());
} catch (Exception e) {
log.error("error,caller:{},size:{}", getMethod(), completableFutureList.size(), e);
}
}

public void finish() {
try {
CompletableFuture
.allOf(completableFutureList.toArray(new CompletableFuture[0]))
.get();
} catch (InterruptedException e) {
log.warn("interrupted,caller:{},size:{}", getMethod(), completableFutureList.size());
} catch (Exception e) {
log.error("error,caller:{},size:{}", getMethod(), completableFutureList.size(), e);
}
}

private <T> CompletableFuture<T> timeoutAfter(int timeout,
CompletableFuture<T> f) {
es.schedule(() -> f.complete(null), timeout, TimeUnit.MILLISECONDS);
return f;
}

public static String getMethod() {
StackTraceElement caller = (new Throwable()).getStackTrace()[2];

return caller.getClassName() + "@" + caller.getMethodName();
}
}

调用方法:

CompletableFutureUtil completableFutureUtil = CompletableFutureUtil.newInstance();

list.forEach(key -> completableFutureUtil.runAsync(() -> {
count.incrementAndGet();
}, 1000));

try {
completableFutureUtil.finish();
} catch (Exception e) {
log.error("error ", e);
}
  • 异步任务内部进行限定超时时间,runAsync(Runnable runnable, int timeout)
  • 内部由定时任务进行扫描和容错处理(返回null),es.schedule(() -> f.complete(null), timeout, TimeUnit.MILLISECONDS);
  • allOf不限定执行时候,直接get等待
------ 本文结束------

本文标题:CompletableFuture和Future介绍

文章作者:Perkins

发布时间:2020年05月19日

原始链接:https://perkins4j2.github.io/posts/13999/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK