6

深入理解线程池

 2 years ago
source link: http://www.lzhpo.com/article/128
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

使用线程池的好处

  1. 降低资源消耗
    可以重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度
    当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性
    线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

不使用线程池的坏处

  1. 频繁的线程创建和销毁会占用更多的CPU和内存。
  2. 频繁的线程创建和销毁会对GC产生比较大的压力。
  3. 线程太多,线程切换带来的开销将不可忽视。
  4. 线程太少,多核CPU得不到充分利用,是一种浪费。

线程池的工作原理

当一个新的任务提交到线程池之后:

  1. 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。
  2. 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步。
  3. 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
    线程池的工作流程.png

ThreadPoolExecutor的处理流程

ThreadPoolExecutor的处理流程.png

Executors

Executors是一个线程池工厂,提供了很多的工厂方法,我们来看看它大概能创建哪些线程池。

// 创建单一线程的线程池public static ExecutorService newSingleThreadExecutor();// 创建固定数量的线程池public static ExecutorService newFixedThreadPool(int nThreads);// 创建带缓存的线程池public static ExecutorService newCachedThreadPool();// 创建定时调度的线程池public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);// 创建流式(fork-join)线程池public static ExecutorService newWorkStealingPool();

创建单一线程的线程池

故名思意,这个线程池只有一个线程。若多个任务被提交到此线程池,那么会被缓存到队列(队列长度为Integer.MAX_VALUE)。当线程空闲的时候,按照FIFO的方式进行处理。

创建固定数量的线程池

和创建单一线程的线程池类似,只是这儿可以并行处理任务的线程数更多一些罢了。若多个任务被提交到此线程池,会有下面的处理过程。

如果线程的数量未达到指定数量,则创建线程来执行任务
如果线程池的数量达到了指定数量,并且有线程是空闲的,则取出空闲线程执行任务
如果没有线程是空闲的,则将任务缓存到队列(队列长度为Integer.MAX_VALUE)。当线程空闲的时候,按照FIFO的方式进行处理.

创建带缓存的线程池

这种方式创建的线程池,核心线程池的长度为0,线程池最大长度为Integer.MAX_VALUE。由于本身使用SynchronousQueue作为等待队列的缘故,导致往队列里面每插入一个元素,必须等待另一个线程从这个队列删除一个元素。

创建定时调度的线程池

和上面3个工厂方法返回的线程池类型有所不同,它返回的是ScheduledThreadPoolExecutor类型的线程池。平时我们实现定时调度功能的时候,可能更多的是使用第三方类库,比如:quartz等。但是对于更底层的功能,我们仍然需要了解

手动创建线程池

ThreadPoolExecutor构造方法有7个参数.png

ThreadPoolExecutor源码:

    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {        if (corePoolSize < 0 ||            maximumPoolSize <= 0 ||            maximumPoolSize < corePoolSize ||            keepAliveTime < 0)            throw new IllegalArgumentException();        if (workQueue == null || threadFactory == null || handler == null)            throw new NullPointerException();        this.acc = System.getSecurityManager() == null ?                null :                AccessController.getContext();        this.corePoolSize = corePoolSize;        this.maximumPoolSize = maximumPoolSize;        this.workQueue = workQueue;        this.keepAliveTime = unit.toNanos(keepAliveTime);        this.threadFactory = threadFactory;        this.handler = handler;    }

ThreadPoolExecutor构造方法有7个参数

  1. corePoolSize:线程池中的核心线程数。
  2. maximumPoolSize:线程池中的最大线程数。
  3. keepAliveTime:空闲时间,当线程池数量超过核心线程数时,多余的空闲线程存活的时间,即:这些线程多久被销毁。
  4. unit:空闲时间的单位,可以是毫秒、秒、分钟、小时和天,等等。
  5. workQueue:等待队列,线程池中的线程数超过核心线程数时,任务将放在等待队列,它是一个BlockingQueue类型的对象。
  6. threadFactory:线程工厂,我们可以使用它来创建一个线程。
  7. handler:拒绝策略,当线程池和等待队列都满了之后,需要通过该对象的回调函数进行回调处理。

为什么阿里Java规约禁止使用Java内置Executors创建线程池?

阿里巴巴Java规约中让我们手动创建线程池效果更好哦!
其实可以从ThreadPoolExecutor构造方法的7个参数出发。
规约中的原话:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

等待队列-workQueue

等待队列是BlockingQueue类型的,理论上只要是它的子类,都可以用来作为等待队列。

JDK中自带的一些阻塞队列

  1. ArrayBlockingQueue:队列是有界的,基于数组实现的阻塞队列。
  2. LinkedBlockingQueue:队列可以有界,也可以无界。基于链表实现的阻塞队列。
  3. SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。该队列也是Executors.newCachedThreadPool()的默认队列。
  4. PriorityBlockingQueue:带优先级的无界阻塞队列。

通常情况下,我们需要指定阻塞队列的上界(比如1024)。另外,如果执行的任务很多,我们可能需要将任务进行分类,然后将不同分类的任务放到不同的线程池中执行。

线程工厂-threadFactory

ThreadFactory接口

ThreadFactory是一个接口,只有一个方法。
ThreadFactory是一个接口,只有一个方法。.png

Executors的实现使用了默认的线程工厂-DefaultThreadFactory。它的实现主要用于创建一个线程,线程的名字为pool-{poolNum}-thread-{threadNum}。
Executors采用了默认的DefaultThreadFactory线程工厂.png
源代码:

    /**     * The default thread factory     */    static class DefaultThreadFactory implements ThreadFactory {        private static final AtomicInteger poolNumber = new AtomicInteger(1);        private final ThreadGroup group;        private final AtomicInteger threadNumber = new AtomicInteger(1);        private final String namePrefix;        DefaultThreadFactory() {            SecurityManager s = System.getSecurityManager();            group = (s != null) ? s.getThreadGroup() :                                  Thread.currentThread().getThreadGroup();            namePrefix = "pool-" +                          poolNumber.getAndIncrement() +                         "-thread-";        }        public Thread newThread(Runnable r) {            Thread t = new Thread(group, r,                                  namePrefix + threadNumber.getAndIncrement(),                                  0);            if (t.isDaemon())                t.setDaemon(false);            if (t.getPriority() != Thread.NORM_PRIORITY)                t.setPriority(Thread.NORM_PRIORITY);            return t;        }    }

自定义线程名称就是实现ThreadFactory

/** * 带有名称的线程工厂 * <p>为什么需要定义线程的名称? * 因为,如果在线程很多的时候,定义线程的名称有助于我们调试和定位问题。 */class MyThreadFactory implements ThreadFactory {    /**     * 线程名称     */    private final String threadName;    /**     * 构造器:传入线程名称,设置线程名称     */    MyThreadFactory(String threadName) {        this.threadName = threadName;    }    @Override    public Thread newThread(Runnable r) {        Thread t = new Thread(r, threadName);        if (t.isDaemon()) {            // 此线程是否是守护程序线程            t.setDaemon(true);        }        if (t.getPriority() != Thread.NORM_PRIORITY) {            // 分配给线程的默认优先级,默认NORM_PRIORITY为5            t.setPriority(Thread.NORM_PRIORITY);        }        return t;    }}

拒绝策略/线程池饱和策略-handler

什么是拒绝策略?

就是当线程池满了、队列也满了的时候,我们对任务采取的措施。或者丢弃、或者执行、或者其他…

JDK有哪些拒绝策略?

JDK自带4种拒绝策略

JDK自带4种拒绝策略,分别是:

  1. CallerRunsPolicy:在调用者线程执行。
    自实现CallerRunsPolicy类似:

    /*** 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。*/class MyCallerRunsPolicy implements RejectedExecutionHandler { public MyCallerRunsPolicy() { } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {     if (!executor.isShutdown()) {         r.run();     } }}
  2. AbortPolicy:直接抛出RejectedExecutionException异常。
    自实现AbortPolicy类似:

    /*** 线程池拒绝策略:AbortPolicy => ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。* <p>* 很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。*/class MyAbortPolicy implements RejectedExecutionHandler { public MyAbortPolicy() { } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {     throw new RejectedExecutionException("Task " + r.toString() +             " rejected from " +             e.toString()); }}
  3. DiscardPolicy:任务直接丢弃,不做任何处理。

    /*** 线程池拒绝策略:DiscardPolicy => 啥都不干,对于线程池的任务不抛弃也不会执行。*/class MyDiscardPolicy implements RejectedExecutionHandler { public MyDiscardPolicy() { } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { }}
  4. DiscardOldestPolicy:丢弃队列里最旧的那个任务,再尝试执行当前任务。

    /*** 线程池拒绝策略:DiscardOldestPolicy => 抛弃线程池中老的任务,再把新的任务加进去*/class MyDiscardOldestPolicy implements RejectedExecutionHandler { public MyDiscardOldestPolicy() { } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {     if (!executor.isShutdown()) {         executor.getQueue().poll();         executor.execute(r);     } }}

    如何使用?

         // 线程池拒绝策略:DiscardPolicy => 直接丢弃//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());     // 自实现DiscardPolicy//        executor.setRejectedExecutionHandler(new MyDiscardPolicy());     // 线程池拒绝策略:AbortPolicy => 直接抛异常//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());     // 自实现MyAbortPolicy//        executor.setRejectedExecutionHandler(new MyAbortPolicy());     // 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());     // 自实现MyCallerRunsPolicy//        executor.setRejectedExecutionHandler(new MyCallerRunsPolicy());     // 线程池拒绝策略:DiscardOldestPolicy => 对于线程池中的任务不抛弃也不拒绝,啥也不干//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());     // 自实现MyDiscardOldestPolicy//        executor.setRejectedExecutionHandler(new MyDiscardOldestPolicy());     // 自定义线程池拒绝策略:比如现在想让被拒绝的任务在一个新的线程中执行。     executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());

提交任务的两种方式

提及任务的方式有两种,分别是:submit和execute

这两个方法的区别:

  • submit:submit()用于提交一个需要返回果的任务。该方法返回一个Future对象,通过调用这个对象的get()方法,我们就能获得返回结果。get()方法会一直阻塞,直到返回结果返回。另外,我们也可以使用它的重载方法get(long timeout, TimeUnit unit),这个方法也会阻塞,但是在超时时间内仍然没有返回结果时,将抛出异常TimeoutException。
    submit(Runnable task)源代码:
      public Future<?> submit(Runnable task) {      if (task == null) throw new NullPointerException();      RunnableFuture<Void> ftask = newTaskFor(task, null);      execute(ftask);      return ftask;  }
  • execute:execute()用于提交不需要返回结果的任务。
    execute源代码:
      public void execute(Runnable command) {      if (command == null)          throw new NullPointerException();      int c = ctl.get();      if (workerCountOf(c) < corePoolSize) {          if (addWorker(command, true))              return;          c = ctl.get();      }      if (isRunning(c) && workQueue.offer(command)) {          int recheck = ctl.get();          if (! isRunning(recheck) && remove(command))              reject(command);          else if (workerCountOf(recheck) == 0)              addWorker(null, false);      }      else if (!addWorker(command, false))          reject(command);  }

关闭线程池的两种方式

可以调用线程池对象的shutdown()和shutdownNow()方法来关闭线程池。

这两个方法的区别:

  • shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。
    shutdown源代码:
      public void shutdown() {      final ReentrantLock mainLock = this.mainLock;      mainLock.lock();      try {          // 确保安全关闭          checkShutdownAccess();          // 将线程池状态置为SHUTDOWN          advanceRunState(SHUTDOWN);          // 不再接受新任务          interruptIdleWorkers();          onShutdown(); // hook for ScheduledThreadPoolExecutor      } finally {          mainLock.unlock();      }      tryTerminate();  }
  • shutdownNow()会将线程池状态置为STOP,对所有线程执行interrupt()操作,清空队列,并将队列中的任务返回回来。
    shutdownNow源代码:
      public List<Runnable> shutdownNow() {      List<Runnable> tasks;      final ReentrantLock mainLock = this.mainLock;      mainLock.lock();      try {          // 确保安全关闭          checkShutdownAccess();          // 将线程池状态置为STOP          advanceRunState(STOP);          // 打断所有线程          interruptWorkers();          // 清空队列          tasks = drainQueue();      } finally {          mainLock.unlock();      }      tryTerminate();      // 并将队列中的任务返回回来      return tasks;  }

另外,关闭线程池涉及到两个返回boolean的方法,isShutdown()和isTerminated,分别表示是否关闭和是否终止。

如何正确配置线程池的参数?

  1. 任务的性质:CPU密集型、IO密集型和混杂型。
  2. 任务的优先级:高中低。
  3. 任务执行的时间:长中短。
  4. 任务的依赖性:是否依赖数据库或者其他系统资源。

通常来说,如果任务属于CPU密集型,那么我们可以将线程池数量设置成CPU的个数,以减少线程切换带来的开销。如果任务属于IO密集型,我们可以将线程池数量设置得更多一些,比如CPU个数*2。

可以通过Runtime.getRuntime().availableProcessors()来获取CPU的个数。

线程池监控

如果系统中大量用到了线程池,那么我们是不是有必要对线程池进行监控。
这样子有助于我们定位出现的问题。

ThreadPoolExecutor自带了一些方法:

  1. long getTaskCount():获取已经执行或正在执行的任务数。
  2. long getCompletedTaskCount():获取已经执行的任务数。
  3. int getLargestPoolSize():获取线程池曾经创建过的最大线程数,根据这个参数,我们可以知道线程池是否满过。
  4. int getPoolSize():获取线程池线程数。
  5. int getActiveCount():获取活跃线程数(正在执行任务的线程数)。
  1. protected void beforeExecute(Thread t, Runnable r):任务执行之前调用。
  2. protected void afterExecute(Runnable r, Throwable t):任务执行之后调用。
  3. protected void terminated():线程池结束之后调用。

遇到的一个问题

package com.lzhpo.threadpool.demo3;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * 遇到的一个问题 * * @author lzhpo */public class AProblem {    static class DivTask implements Runnable {        int a,b;        public DivTask(int a, int b) {            this.a = a;            this.b = b;        }        @Override        public void run() {            double result = a / b;            System.out.println(result);        }    }    public static void main(String[] args) {        ExecutorService executor = Executors.newFixedThreadPool(5);        for (int i = 0; i < 5; i++) {            executor.submit(new DivTask(100, i));        }    }}

运行结果:

100.025.033.050.0
  1. 我明明第一次的时候除数为0,为什么不报错?
  2. 按理论来说,应该是有5次输出的,为什么只有三次?
    线程池submit的问题.png

解决办法:
对submit的返回值进行处理。
因为submit是一个非阻塞的方法,就是不管你发生什么错误,我都会执行下去。
线程池submit的问题-解决办法.png

  1. 尽量使用手动的方式创建线程池,避免使用Executors工厂类。
  2. 根据场景,合理设置线程池的各个参数,包括线程池数量、队列、线程工厂和拒绝策略。
  3. 在调线程池submit()方法的时候,一定要尽量避免任务执行异常被吞掉的问题。

HandCreateThreadPoolDemo1

package com.lzhpo.threadpool.demo3;import java.util.Random;import java.util.concurrent.*;/** * 手动创建线程池 * * @author lzhpo */public class HandCreateThreadPoolDemo1 {    public static void main(String[] args) {        ThreadPoolExecutor executor = new ThreadPoolExecutor(                1,                1,                1,                TimeUnit.SECONDS,                // 线程池缓冲队列                new LinkedBlockingDeque<>(10),                // 自定义ThreadFactory线程工厂                new MyThreadFactory("HandCreateThreadPoolDemo1")) {            @Override            protected void beforeExecute(Thread t, Runnable r) {                System.out.println("I'm beforeExecute.");            }            @Override            protected void afterExecute(Runnable r, Throwable t) {                System.out.println("I'm afterExecute.");            }            @Override            protected void terminated() {                System.out.println("I'm terminated.");            }        };        /**         * 线程池拒绝策略         */        // 线程池拒绝策略:DiscardPolicy => 直接丢弃//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());        // 自实现DiscardPolicy//        executor.setRejectedExecutionHandler(new MyDiscardPolicy());        // 线程池拒绝策略:AbortPolicy => 直接抛异常//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());        // 自实现MyAbortPolicy//        executor.setRejectedExecutionHandler(new MyAbortPolicy());        // 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        // 自实现MyCallerRunsPolicy//        executor.setRejectedExecutionHandler(new MyCallerRunsPolicy());        // 线程池拒绝策略:DiscardOldestPolicy => 对于线程池中的任务不抛弃也不拒绝,啥也不干//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());        // 自实现MyDiscardOldestPolicy//        executor.setRejectedExecutionHandler(new MyDiscardOldestPolicy());        // 自定义线程池拒绝策略:比如现在想让被拒绝的任务在一个新的线程中执行。        executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());        /**         * 提交任务         */        // 方法1:submit,非阻塞方法,有返回结果,也就是Future对象。        executor.submit(() -> {            System.out.println("This is a task.");            System.out.println(Thread.currentThread().getName());            ;        });        // 方法2:execute。没有返回结果。//        executor.execute(() -> {//            System.out.println("This is a task.");//        });        /**         * 关闭线程池         */        // 方法1:shutdown。shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。        executor.shutdown();        // 方法2:立马结束,并且清空任务队列//        executor.shutdownNow();    }}//--------------------自定义线程名称------------------------/** * 带有名称的线程工厂 * <p>为什么需要定义线程的名称? * 因为,如果在线程很多的时候,定义线程的名称有助于我们调试和定位问题。 */class MyThreadFactory implements ThreadFactory {    /**     * 线程名称     */    private final String threadName;    /**     * 构造器:传入线程名称,设置线程名称     */    MyThreadFactory(String threadName) {        this.threadName = threadName;    }    @Override    public Thread newThread(Runnable r) {        Thread t = new Thread(r, threadName);        if (t.isDaemon()) {            // 此线程是否是守护程序线程            t.setDaemon(true);        }        if (t.getPriority() != Thread.NORM_PRIORITY) {            // 分配给线程的默认优先级,默认NORM_PRIORITY为5            t.setPriority(Thread.NORM_PRIORITY);        }        return t;    }}//--------------------线程池拒绝策略------------------------/** * 线程池拒绝策略:AbortPolicy => ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。 * <p> * 很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。 */class MyAbortPolicy implements RejectedExecutionHandler {    public MyAbortPolicy() {    }    @Override    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        throw new RejectedExecutionException("Task " + r.toString() +                " rejected from " +                e.toString());    }}/** * 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。 */class MyCallerRunsPolicy implements RejectedExecutionHandler {    public MyCallerRunsPolicy() {    }    @Override    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {        if (!executor.isShutdown()) {            r.run();        }    }}/** * 线程池拒绝策略:DiscardPolicy => 啥都不干,对于线程池的任务不抛弃也不会执行。 */class MyDiscardPolicy implements RejectedExecutionHandler {    public MyDiscardPolicy() {    }    @Override    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {    }}/** * 线程池拒绝策略:DiscardOldestPolicy => 抛弃线程池中老的任务,再把新的任务加进去 */class MyDiscardOldestPolicy implements RejectedExecutionHandler {    public MyDiscardOldestPolicy() {    }    @Override    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {        if (!executor.isShutdown()) {            executor.getQueue().poll();            executor.execute(r);        }    }}/** * 自定义线程池拒绝策略:比如现在想让被拒绝的任务在一个新的线程中执行。 */class MyRejectedExecutionHandler implements RejectedExecutionHandler {    @Override    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {        new Thread(r, "新线程" + new Random().nextInt(10)).start();    }}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK