1

你真的了解ForkJoinPool吗?这些技巧让你的代码性能提升十倍!

 1 year ago
source link: https://www.51cto.com/article/757568.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

你真的了解ForkJoinPool吗?这些技巧让你的代码性能提升十倍!

作者:你的老师父 2023-06-13 13:52:00
ForkJoinPool是Java 7中新增的一种线程池实现,它主要用于执行大量的计算密集型任务。ForkJoinPool采用“工作窃取”算法,即当某个线程的任务执行完毕后,会从其他线程的任务队列中窃取任务执行,从而实现负载均衡。
24e59b0572f80ad9d8f7226d4cce60d8d93f2c.png

1、线程池简介

线程池是一种常见的多线程编程方式,它可以有效地管理线程的创建、销毁和复用,从而提高程序的性能和稳定性。Java中提供了多种线程池实现,包括ForkJoinPool、Executors、CompletionService等。

2、ForkJoinPool

ForkJoinPool是Java 7中新增的一种线程池实现,它主要用于执行大量的计算密集型任务。ForkJoinPool采用“工作窃取”算法,即当某个线程的任务执行完毕后,会从其他线程的任务队列中窃取任务执行,从而实现负载均衡。

以下是一个使用ForkJoinPool的示例代码:

import java.util.concurrent.*;

public class ForkJoinPoolExample {
    public static void main(String[] args) {
        int n = 1000000;
        int[] array = new int[n];
        for (int i = 0; i < n; i++) {
            array[i] = i;
        }

        ForkJoinPool pool = new ForkJoinPool();
        int sum = pool.invoke(new SumTask(array, 0, n));
        System.out.println("Sum: " + sum);
    }
}

class SumTask extends RecursiveTask<Integer> {
    private int[] array;
    private int start;
    private int end;

    public SumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start <= 1000) {
            int sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            int mid = (start + end) / 2;
            SumTask left = new SumTask(array, start, mid);
            SumTask right = new SumTask(array, mid, end);
            left.fork();
            right.fork();
            return left.join() + right.join();
        }
    }
}

以上代码创建了一个ForkJoinPool,用于计算一个长度为1000000的数组的元素之和。其中,SumTask是一个继承自RecursiveTask的任务类,用于将数组分成若干个子任务进行计算。当子任务的数量小于等于1000时,直接计算子任务的结果;否则,将子任务分成两个部分,分别交给左右两个子任务进行计算,最后将两个子任务的结果相加。

3、Executors

Executors是Java中提供的一个线程池工具类,它可以方便地创建各种类型的线程池。Executors提供了多个静态方法,用于创建不同类型的线程池,例如newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor等。

(1)newFixedThreadPool

newFixedThreadPool是一个固定大小的线程池,它会一直保持固定数量的线程,如果有新的任务提交,但线程池中的线程都在忙碌,那么新的任务就会进入等待队列中等待执行。

以下是一个使用newFixedThreadPool创建线程池的示例代码:

import java.util.concurrent.*;

public class FixedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 20; i++) {
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    System.out.println("Task executed by " + Thread.currentThread().getName());
                }
            };
            executor.execute(task);
        }
        executor.shutdown();
    }
}

以上代码创建了一个固定大小为5的线程池,提交了20个任务,并关闭了线程池。每个任务的执行会输出执行线程的名称。

(2)newCachedThreadPool

newCachedThreadPool是一个可缓存的线程池,它会根据需要创建新的线程,如果有线程空闲时间超过60秒,就会被回收。如果有新的任务提交,但线程池中的线程都在忙碌,那么就会创建新的线程来处理任务。

以下是一个使用newCachedThreadPool创建线程池的示例代码:

import java.util.concurrent.*;

public class CachedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    System.out.println("Task executed by " + Thread.currentThread().getName());
                }
            };
            executor.execute(task);
        }
        executor.shutdown();
    }
}

以上代码创建了一个可缓存的线程池,提交了20个任务,并关闭了线程池。每个任务的执行会输出执行线程的名称。

(3)newSingleThreadExecutor

newSingleThreadExecutor是一个单线程的线程池,它会保证所有任务按照顺序执行,即每个任务都会在前一个任务执行完毕后才会执行。

以下是一个使用newSingleThreadExecutor创建线程池的示例代码:

import java.util.concurrent.*;

public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 20; i++) {
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    System.out.println("Task executed by " + Thread.currentThread().getName());
                }
            };
            executor.execute(task);
        }
        executor.shutdown();
    }
}

以上代码创建了一个单线程的线程池,提交了20个任务,并关闭了线程池。每个任务的执行会输出执行线程的名称。

4、CompletionService

CompletionService是Java中提供的一个用于异步执行任务的工具类,它可以方便地获取已完成的任务的结果。CompletionService内部维护了一个阻塞队列,用于存储已完成的任务的结果。

以下是一个使用CompletionService的示例代码:

import java.util.concurrent.*;

public class CompletionServiceExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            Callable<Integer> task = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep((long) (Math.random() * 1000));
                    return index;
                }
            };
            completionService.submit(task);
        }
        for (int i = 0; i < 10; i++) {
            Future<Integer> future = completionService.take();
            System.out.println("Result: " + future.get());
        }
        executor.shutdown();
    }
}

以上代码创建了一个固定大小为5的线程池,提交了10个任务,并使用CompletionService获取已完成的任务的结果。每个任务会随机休眠一段时间,然后返回任务的编号。在主线程中,使用completionService.take()方法获取已完成的任务的结果,并输出任务的编号。

5、Callable和Future

Callable是Java中提供的一个接口,它类似于Runnable接口,但是可以返回执行结果。Future是Java中提供的一个接口,它可以用于获取异步执行任务的结果。

以下是一个使用Callable和Future的示例代码:

import java.util.concurrent.*;

public class CallableAndFutureExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Callable<Integer> task = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Thread.sleep(1000);
                return 1 + 2;
            }
        };
        Future<Integer> future = executor.submit(task);
        System.out.println("Result: " + future.get());
        executor.shutdown();
    }
}

以上代码创建了一个单线程的线程池,提交了一个任务,并使用Future获取任务的执行结果。任务会休眠1秒钟,然后返回1+2的结果。在主线程中,使用future.get()方法获取任务的执行结果,并输出结果。

6、Runnable和Thread

Runnable是Java中提供的一个接口,它表示一个可以被线程执行的任务。Thread是Java中提供的一个类,它表示一个线程。

以下是一个使用Runnable和Thread的示例代码:

public class RunnableAndThreadExample {
    public static void main(String[] args) throws InterruptedException {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println("Task executed by " + Thread.currentThread().getName());
            }
        };
        Thread thread = new Thread(task);
        thread.start();
        thread.join();
    }
}

以上代码创建了一个任务,并使用Thread将任务提交到一个新的线程中执行。在主线程中,使用thread.join()方法等待新线程执行完毕。每个任务的执行会输出执行线程的名称。

本文介绍了Java中常见的线程池实现,包括ForkJoinPool、Executors、CompletionService、Callable、Future、Runnable等知识点的详细讲解和完整可运行的代码示例。线程池是Java中常见的多线程编程方式,它可以有效地管理线程的创建、销毁和复用,从而提高程序的性能和稳定性。在实际开发中,需要根据具体的需求选择合适的线程池实现。

责任编辑:姜华 来源: 今日头条

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK