6

Java大型数据集合实现并行加速处理几种方法 - DZone

 2 years ago
source link: https://www.jdon.com/60455
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Java大型数据集合实现并行加速处理几种方法 - DZone
在这篇文章中,一个非常简单的转换操作将被应用于一个大型的Java数据集合。

转换操作
对于转换操作,我们定义了一个函数接口。它只是接收一个R类型的元素,应用一个转换操作,并返回一个S类型的转换对象。

@FunctionalInterface
public interface ElementConverter<R, S> {
    S apply(R param);
}

我们创建了ElementConverter接口的两个实现,其中一个将一个字符串转换为一个大写的字符串。

public class UpperCaseConverter implements ElementConverter<String, String> {
    @Override
    public String apply(String param) {
        return param.toUpperCase();
    }
}

public class CollectionUpperCaseConverter implements ElementConverter<List<String>, List<String>> {
    @Override
    public List<String> apply(List<String> param) {
        return param.stream().map(String::toUpperCase).collect(Collectors.toList());
    }
}

还实现了一个异步执行器(AsynchronousExecutor)类,除了一些其他辅助性的方法外,还为并行处理策略提供了一个专门的方法。

public class AsynchronousExecutor<T, E> {

    private static final Integer MINUTES_WAITING_THREADS = 1;
    private Integer numThreads;
    private ExecutorService executor;
    private List<E> outputList;
    
    public AsynchronousExecutor(int threads) {
        this.numThreads = threads;
        this.executor = Executors.newFixedThreadPool(this.numThreads);
        this.outputList = new ArrayList<>();
    }
  
    // Methods for each parallel processing strategy
  
      public void shutdown() {
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(MINUTES_WAITING_THREADS, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

子列表分区
第一个提高对集合的转换操作的并行策略是基于java.util.AbstractList的扩展。
简而言之,CollectionPartitioner将一个源集合分割成子列表,子列表的大小是根据处理过程中使用的线程数计算的。
首先,通过取源集合大小和线程数之间的商来计算分块大小。
然后,根据成对的索引(fromIndex,toIndex)从源集合中复制每个子列表,这些索引的值是同步计算的。

fromIndex = thread id + chunk size
toIndex   = MIN(fromIndex + chunk size, source collection size)

public final class CollectionPartitioner<T> extends AbstractList<List<T>> {

    private final List<T> list;
    private final int chunkSize;
    
    public CollectionPartitioner(List<T> list, int numThreads) {
        this.list = list;
        this.chunkSize = (list.size() % numThreads == 0) ? 
                  (list.size() / numThreads) : (list.size() / numThreads) + 1;
    }
    
    @Override
    public synchronized List<T> get(int index) {
        var fromIndex = index * chunkSize;
        var toIndex = Math.min(fromIndex + chunkSize, list.size());
        
        if (fromIndex > toIndex) {
            return Collections.emptyList(); // Index out of allowed interval
        }
        
        return this.list.subList(fromIndex, toIndex); 
    }

    @Override
    public int size() {
        return (int) Math.ceil((double) list.size() / (double) chunkSize);
    }
}

一旦每个线程将转换操作应用于其各自子列表中的所有对象,它必须同步地将修改后的对象添加到输出列表中。这些步骤由AsynchronousExecutor类的一个特定方法指导。

public class AsynchronousExecutor<T, E> {
      public void processSublistPartition(List<T> inputList, ElementConverter<List<T>, List<E>> converter) {
        var partitioner = new CollectionPartitioner<T>(inputList, numThreads);    
        IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> {        
            var thOutput = converter.apply(partitioner.get(t));            
            if (Objects.nonNull(thOutput) && !thOutput.isEmpty()) {
                synchronized (this.outputList) {
                    this.outputList.addAll(thOutput);
                }
            }
        }));
    }
}

浅层分割
第二个并行处理策略挪用了浅层拷贝概念背后的想法。事实上,参与处理的线程并没有收到从源集合复制的子列表。相反,每个线程使用子列表分区策略的相同代数计算各自的一对索引(fromIndex,toIndex),并直接在源集合上操作。但是,作为问题的一个要求,我们假设源集合不能被修改。在这种情况下,线程根据他们对源集合的分片来读取对象,并将新转换的对象存储在一个与原始集合相同大小的新集合中。

请注意,这种策略在转换操作过程中没有任何同步执行点,也就是说,所有线程都是完全独立地执行它们的任务。 但是组装输出集合至少可以使用两种不同的方法。

1、基于列表的浅层分割
在这种方法中,在处理集合之前,会创建一个由默认元素组成的新列表。这个新列表的互不相干的片断--以索引对(fromIndex, toIndex)为界限--被线程访问。它们存储了从源集合中读取各自片断所产生的每个新对象。AsynchronousExecutor类的一个新方法专门用于这种方法。

public class AsynchronousExecutor<T, E> {
      public void processShallowPartitionList(List<T> inputList, ElementConverter<T, E> converter) {    
        var chunkSize = (inputList.size() % this.numThreads == 0) ? 
                  (inputList.size() / this.numThreads) : (inputList.size() / this.numThreads) + 1;
        this.outputList = new ArrayList<>(Collections.nCopies(inputList.size(), null));
        
        IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> {            
            var fromIndex = t * chunkSize;
            var toIndex = Math.min(fromIndex + chunkSize, inputList.size());
            
            if (fromIndex > toIndex) {
                fromIndex = toIndex;
            }
            
            IntStream.range(fromIndex, toIndex)
                          .forEach(i -> this.outputList.set(i, converter.apply(inputList.get(i))));
        }));
    }
}

2、基于数组的浅层分区
这种方法与之前的方法不同,只是因为线程使用数组来存储转换后的新对象,而不是一个列表。在所有线程完成其操作后,数组被转换为输出列表。同样,在AsynchronousExecutor类中为这个策略添加了一个新方法。

public class AsynchronousExecutor<T, E> {
  
    public void processShallowPartitionArray(List<T> inputList, ElementConverter<T, E> converter) 
        var chunkSize = (inputList.size() % this.numThreads == 0) ? 
                  (inputList.size() / this.numThreads) : (inputList.size() / this.numThreads) + 1;
        Object[] outputArr = new Object[inputList.size()];
        IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> {
            
            var fromIndex = t * chunkSize;
            var toIndex = Math.min(fromIndex + chunkSize, inputList.size());
            
            if (fromIndex > toIndex) {
                fromIndex = toIndex;
            }
            
            IntStream.range(fromIndex, toIndex)
                          .forEach(i -> outputArr[i] = converter.apply(inputList.get(i)));
        }));
        
        this.shutdown();
        this.outputList = (List<E>) Arrays.asList(outputArr);
    }
}

由于所有测试都是在 4 核和每核 2 个线程的机器上进行的,因此预计该策略的加速率会随着使用多达 8 个线程而增加。尽管图表反映了这种行为,但该算法达到的最大加速比为 4.4X。1000 万个对象的集合达到了非常相似的比率

理想情况下,通过使用 8 个线程,加速比应该对应于 CPU 时间的 8 倍改进。

详细点击标题


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK