11

CompletableFuture并行处理List分批数据

 2 years ago
source link: https://segmentfault.com/a/1190000041454105
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

CompletableFuture并行处理List分批数据

发布于 今天 04:47
import cn.hutool.core.util.RandomUtil;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.junit.Test;
import org.springframework.util.StopWatch;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
 * @author nieweijun
 * @since 2022/2/24 10:53
 */
public class TestCf {
    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(512));

    @Test
    public void testTotalAmount() {
        final BigDecimal[] sum = {new BigDecimal(0)};
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        // 1. 假设 sku库存对象的id是0到10000, 先初始化数据id
        List<SkuInventory> skuInventories = IntStream.rangeClosed(1, 10000).boxed()
                .map(i -> SkuInventory.builder().id(String.valueOf(i)).build())
                .collect(Collectors.toList());

        // 2. 查询所有库存, 计算价值总额
        // 模拟每一条数据:批量/每次100个
        List<List<SkuInventory>> partitionsList = Lists.partition(skuInventories, 100);

        // 3. 查算:使用异步
        List<CompletableFuture> cfList = new ArrayList<>();
        CompletableFuture[] cfArray = new CompletableFuture[partitionsList.size()];
        partitionsList.stream().forEach(partition -> {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> queryBatch(partition), executor);
            cfList.add(future);
        });
        CompletableFuture.allOf(cfList.toArray(cfArray)).join(); // 全执行完

        // 4.统计
        skuInventories.stream().forEach(e -> {
            BigDecimal multiply = BigDecimal.valueOf(e.getCount()).multiply(e.getPrice());
            sum[0] = sum[0].add(multiply);
        });

        stopWatch.stop();
        System.out.println("结果是: " + sum[0]);
        System.out.println("耗时毫秒数为: " + stopWatch.getTotalTimeMillis());
    }

    // 模拟查询数据; 设置数据值
    private void queryBatch(List<SkuInventory> partList) {
        if (CollectionUtils.isEmpty(partList)) {
            return;
        }
        long millisCost = RandomUtil.randomInt(100, 1000);
        // 模拟查询耗时
        try {
            TimeUnit.MILLISECONDS.sleep(millisCost);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 模拟赋值: 每个数量为10, 价格1.0
        partList.stream().forEach(e -> {
            e.setCount(10);
            e.setPrice(new BigDecimal("1.0"));
        });
        System.out.println("==========一批partList处理完成耗时[" + millisCost + "ms]==========");
    }

    // sku库存对象
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    static class SkuInventory {
        /** id */
        private String id;
        /** 库存数量 */
        private Integer count;
        /** 单价 */
        private BigDecimal price;
    }
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK