4

Java源码分析专题系列之【ThreadPoolExecutor】深入浅出的源码分析

 3 years ago
source link: https://my.oschina.net/liboware/blog/5063134
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

线程池执行任务的流程

  • 如果线程池工作线程数<corePoolSize,创建新线程执行task,并不断轮训t等待队列处理task。

  • 如果线程池工作线程数>=corePoolSize并且等待队列未满,将task插入等待队列。

  • 如果线程池工作流程数>=corePoolSize并且等待队列已满,且工作线程数<maximumPoolSize,创建新线程执行task。

  • 如果线程池工作流程数>=corePoolSize并且等待队列已满,且工作线程数=maximumPoolSize,执行拒绝策略。

execute()原理

public void execute(Runnable command) {        
	if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 
         * 如果运行的线程数小于corePoolSize,尝试创建一个新线程(Worker),并执行			* 它的第一个任务command。
         * 
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * 
         *  如果task成功插入等待队列,我们仍需要进行双重校验是否可以成功添加一个线程
      * (因为有的线程可能在我们上次检查以后已经死掉了)或者在我们进入这个
      *  方法后线程池已经关闭了。
      *  
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 
           如果等待队列已满,我们尝试新创建一个线程。如果创建失败,我们知道线程已关闭
           或者已饱和,因此我们拒绝改任务。
         */
        int c = ctl.get();
        // 高3位表示状态,低29位任务数量。
      //工作线程小于核心线程数,创建新的线程。
        if (workerCountOf(c) < corePoolSize) {
        //创建新的worker立即执行command,并且轮训workQueue处理task。
        (核心线程数)
            if (addWorker(command, true)) 
                return;
            c = ctl.get();
        }
     //线程池在运行状态且可以将task插入队列
     //第一次校验线程池在运行状态
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
        //第二次校验,防止在第一次校验通过后线程池关闭。
        如果线程池关闭,在队列中删除task并拒绝task
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果线程数=0(线程都死掉了,比如:corePoolSize=0),新建线程且未指
            // 定firstTask,仅仅去轮训workQueue
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
     //线程队列已满,尝试创建新线程执行task,创建失败后拒绝task
        //创建失败原因:1.线程池关闭;2.线程数已经达到maxPoolSize
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker()原理 线程管理部分的分水岭-方法调用之前 都是任务管理(任务的创建、以及拒绝、添加任务队列等,任务线程池的状态监控等),方法调用之后属于线程的管理。(线程的执行和阻塞等)

  • 参数:
    • firstTask:worker线程的初始任务,可以为空。
    • core:true:将corePoolSize作为上限,false:将maximumPoolSize作为上限

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
         //外层循环判断线程池的状态
         for (;;) {
            //在进行获取一次上下文操作机制
            int c = ctl.get();
            int rs = runStateOf(c);
             //线程池状态
            // Check if queue empty only if necessary.
          // 线程池状态:RUNNING = -1、SHUTDOWN = 0、 STOP = 1、TIDYING = 2、TERMINATED = 3 
        //线程池至少是shutdown状态
        if (rs >= SHUTDOWN &&
          // 除了线程池正在关闭(shutdown),
          // 队列里还有未处理的task的情况,其他都不能添加
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //内层循环判断是否到达容量上限,worker+1
            for (;;) {
                int wc = workerCountOf(c);//worker数量
                //worker大于Integer最大上限
                //或到达边界上限
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //CAS worker+1
                if (compareAndIncrementWorkerCount(c))
                    break retry;//成功了跳出循环
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) 
                //如果线程池状态发生变化,重试外层循环
                    continue retry;
                // else CAS failed due to workerCount change;
                // retry inner loop 
         // CAS失败workerCount被其他线程改变,
         // 重新尝试内层循环CAS对workerCount+1

            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask); 
            //1.state置为-1,Worker继承了AbstractQueuedSynchronizer.
            //2.设置firstTask属性.
            //3.Worker实现了Runable接口,将this作为入参创建线程.
            final Thread t = w.thread;
            if (t != null) {
          //addWorker需要加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//workers是HashSet<Worker>
              //设置最大线程池大小
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

可以理解就是每一个Worker对象都是一个AQS队列哦!

addWorker方法有4种传参的方式:

  • addWorker(command, true)

    • 线程数小于corePoolSize。判断workers(HashSet<Worker>)大小,如果worker数量>=corePoolSize返回false,否则创建worker添加到workers,并执行worker的run方法(执行firstTask并轮询tworkQueue)
  • addWorker(command, false)

    • 线程数大于corePoolSize且workQueue已满。如果worker数量>=maximumPoolSize返回false,否则创建worker添加到workers,并执行worker的run方法(执行firstTask并轮询tworkQueue);
  • addWorker(null, false)

    • 没有worker存活也就是任务梳理runcount为0,创建worker去轮询workQueue,长度限制maximumPoolSize
  • addWorker(null, true)

    • 在execute方法中就使用了前3种,结合这个核心方法进行以下分析

以上无论哪种方式都需要进行相关的ReentrantLock的加锁,所以效率和性能不会特别好。所以有了一个小办法,prestartAllCoreThreads()

prestartAllCoreThreads()原理

这个方法调用,启动所有的核心线程去轮询workQueue。因为addWorker是需要上锁的,预启动核心线程可以提高执行效率。

ThreadPoolExecutor 内部类Worker (线程管理的核心类)

    /** 
   * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
   *  1.Worker类主要负责运行线程状态的控制。
   *  2.Worker继承了AQS实现了简单的获取锁和释放所的操作。来避免中断等待执行任务的线
   *  程时,中断正在运行中的线程(线程刚启动,还没开始执行任务)。
   *  3.自己实现不可重入锁,是为了避免在实现线程池控状态控制的方法,例如:
   *  setCorePoolSize的时候中断正在开始运行的线程。
     *  setCorePoolSize可能会调用interruptIdleWorkers(),该方法中会调用worker的tryLock()方法
      *  中断线程,自己实现锁可以确保工作线程启动之前不会被中断
     */

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
 
        /** Thread this worker is running in.  Null if factory fails. */
         封装任务线程机制。
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
        // inhibit interrupts until runWorker //状态置为-1,如果中断线程需要CAS将state 从0-
        >1,以此来保证能只中断从workerQueue getTask的线程
            setState(-1); 
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        // 核心方法机制
        /** Delegates main run loop to outer runWorker  */
        public void run() {
         	//首先执行w.unlock,就是把state置为0,对该线程的中断就可以进行了
            runWorker(this);
        }
    
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        
        // 在setCorePoolSize/shutdown等方法中断worker线程时需要调用该方法,
        // 确保中断的是从workerQueue getTask的线程
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        //调用tryRelease修改state=0,LockSupport.unpark(thread) 下一个等待锁的线程
        public boolean isLocked() { return isHeldExclusively(); }
    
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

workQueue 有多种选择,在 JDK 中一共提供了 7 中阻塞对列,分别为:

  • ArrayBlockingQueue : 一个由数组结构组成的有界阻塞队列。 此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平地访问队列 ,所谓公平访问队列是指阻塞的线程,可按照阻塞的先后顺序访问队列。非公平性是对先等待的线程是不公平的,当队列可用时,阻塞的线程都可以竞争访问队列的资格。

  • LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列。 此队列的默认和最大长度为Integer.MAX_VALUE。 此队列按照先进先出的原则对元素进行排序。

  • PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列。 (虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败,导致 OutOfMemoryError)

  • DelayQueue: 一个使用优先级队列实现的无界阻塞队列。 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素

  • SynchronousQueue: 一个不存储元素的阻塞队列。 一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。(SynchronousQueue 该队列不保存元素)

  • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。 相对于其他阻塞队列LinkedTransferQueue多了tryTransfer和transfer方法。

  • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。 是一个由链表结构组成的双向阻塞队列

关闭线程池

其实,如果优雅的关闭线程池是一个令人头疼的问题,线程开启是简单的,但是想要停止却不是那么容易的。通常而言, 大部分程序员都是使用 jdk 提供的两个方法来关闭线程池,他们分别是:shutdown 或 shutdownNow;

通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程(PS:中断,仅仅是给线程打上一个标记,并不是代表这个线程停止了,如果线程不响应中断,那么这个标记将毫无作用),所以无法响应中断的任务可能永远无法终止。 但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。 只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回 true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow 方法。 这里推荐使用稳妥的 shutdownNow 来关闭线程池,至于更优雅的方式我会在以后的并发编程设计模式中的两阶段终止模式中会再次详细介绍。

线程池的优点

在 Java 并发编程框架中的线程池是运用场景最多的技术,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来至少以下4个好处。

  • 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
  • 第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行;
  • 第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
  • 第四:提供更强大的功能,比如延时定时线程池;

线程池的场景特性

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务;

  • 任务的优先级:高、中和低;

  • 任务的执行时间:长、中和短;

  • 任务的依赖性:是否依赖其他系统资源,如数据库连接;

性质不同的任务可以用不同规模的线程池分开处理。分为CPU密集型和IO密集型。

  • CPU密集型任务应配置尽可能小的线程,如配置 Ncpu+1个线程的线程池。 (可以通过Runtime.getRuntime().availableProcessors()来获取CPU物理核数),参考建议哈

  • IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 2*Ncpu。

  • 混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。

如果这两个任务执行时间相差太大,则没必要进行分解。

可以通过 Runtime.getRuntime().availableProcessors() 方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列 PriorityBlockingQueue来处理。它可以让优先级高的任务先执行(注意:如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行) 执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。

建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点。方式因为提交的任务过多而导致 OOM;


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK