3

一文带你了解线程池原理 - bug的自我救赎

 1 year ago
source link: https://www.cnblogs.com/selfsalvation/p/17152669.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

一文带你了解线程池原理

1.使用线程池的意义何在?

​ 项目开发中,为了统一管理线程,并有效精准地进行排错,我们经常要求项目人员统一使用线程池去创建线程。因为我们是在受不了有些人动不动就去创建一个线程,使用的多了以后,一旦报错就只有一个线程报错信息,还是线程的共用信息,再加上如果你将异常吃了(捕获后不做处理)的情况下,这个错误。。。。em,我实在不知道去哪里排查,不然你换个人试试吧。

2.线程池的重要参数----你真的了解吗

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
  1. corePoolSize:核心线程数。设置核心线程数的意义何在?通俗来讲核心线程数就是正式员工,需要长期坚守岗位,有任务就需要执行。
  2. maximumPoolSize:最大线程池个数。设置最大线程池数量的意义何在?其实就是一个容错机制,当你的需要执行的线程个数已经爆满并且超过的时候,提供了一个容错机制,可以保证在短期内多余的任务正常执行。相当于就是临时工,临时过来执行任务,任务结束后就可以走了。
  3. keepAliveTime:保活的时间。设置的意义何在?当线程任务无剧增的情况下,维持在正常提亮。你无需那么多临时工来执行任务,所以规定时间,临时工可以走人了,也即是除核心线程外的线程可以回收了。
  4. TimeUnit:保活的时间单位。这个就不多赘述了。
  5. BlockingQueue:阻塞队列。设置阻塞队列的意义何在?当所有核心线程都正在工作时,将其放入阻塞队列,等待后续执行。也就是这个任务进行排队,等正式工忙完了继续做。
  6. ThreadFactory:线程工厂。生产线程,由你自己去定义你想生产什么样的线程。
  7. RejectedExecutionHandler:拒绝策略。当你的最大线程与阻塞队列都满了。这个时候,你已经接收不了新的任务进行处理了。所以设置拒绝策略。相当于就是我所有的员工和临时工都在工作了,并且排队的任务都满了,应对这样的情况,你打算如何做。

除此之外还有一个重要的参数:

    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     */
    private volatile boolean allowCoreThreadTimeOut;//是否允许核心线程数超时退出。

该参数有在特定的业务场景下有很大的意义。比如:你的业务只在晚上需要执行,其余时间无需执行。那么为何不把资源让出来,白天的时候,可以让其他业务占有这些资源去执行呢。

3.ThreadExecutorPool线程池重要源码解析

989502-20230224175842970-1330496552.png

由该类图可知,Executor执行器定义执行方法,ExecutorService定义线程池操作的基本方法,AbstractExecutorService定义了线程池操作的方法模板。

ThreadPoolExecutor任务执行流程图

989502-20230224175854550-1392157689.jpg

1.首先是构造方法

​ 基本的参数校验与赋值,简单代码不过多赘述。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        ////基本的参数校验
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)  
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

2.线程执行的方法

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);//将线程对象封装成RunnableFuture
        execute(ftask);//任务执行
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);//将线程对象封装成RunnableFuture
        execute(ftask);//任务执行
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);//将线程对象封装成RunnableFuture
        execute(ftask);//任务执行
        return ftask;
    }
 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();//获取当前的线程池状态。单个参数,保存了线程池的状态以及线程数量
        if (workerCountOf(c) < corePoolSize) { //当线程数量小于核心线程数
            if (addWorker(command, true)) //直接添加任务,运行线程
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//如果核心线程数已经满了,那么直接添加到阻塞队列。
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//线程池不是running状态,执行拒绝策略。
                reject(command);
            else if (workerCountOf(recheck) == 0)//线程池线程数量不能为0,需要有一个线程对线程池的后续操作进行处理,比如关闭线程池
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//当核心线程与阻塞队列都满了的时候,直接添加任务到非核心线程运行。添加失败直接执行拒绝策略
            reject(command);
    }

1.关于ctl.get()方法的解释---利用了单个变量,保存了线程池状态以及线程数量的值

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; //运行状态 正常执行任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS; //关闭线程池,不再接收新任务
    private static final int STOP       =  1 << COUNT_BITS; //关闭线程池,所有任务停止
    private static final int TIDYING    =  2 << COUNT_BITS; //中间状态
    private static final int TERMINATED =  3 << COUNT_BITS; //线程池已经关闭
    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

2.addWorker方法

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();//获取ctl的快照保存在栈上
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&   //如果线程池已经关闭,或者(当前线程池关闭状态当前任务是空且当前工作队列不为空)不满足的情况下直接返回
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//CAS修改线程池ctl变量,增加线程数
                    break retry; //添加成功直接退出
                c = ctl.get();  // 添加不成功,为了保证多线程运行的安全性,重新获取
                if (runStateOf(c) != rs)//当前线程池状态发生改变
                    continue retry; //直接重新运行retry循环体
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); //生成自定义的线程woker
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;//这个代码没有意义,mainLock定义的变量为final。可以直接使用
                mainLock.lock();//添加work使用锁,保证添加任务的原子性。
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN || //线程池处于running状态
                        (rs == SHUTDOWN && firstTask == null)) {//线程池处于showdown状态但是firstTask为空。
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)//保存当前线程池中线程的最大数量
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {//添加成功,运行线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)//线程启动失败
                addWorkerFailed(w);//移除work,减少线程数量
        }
        return workerStarted;
    }

t.start()执行线程任务

//Worker类中实际执行任务的方法 
public void run() {
            runWorker(this);
        }
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts  //将原始的线程状态为-1修改为0,后续通过getState()>=0获取线程是否已经运行的状态,允许线程中断。-1默认为初始化,此处需要进行处理
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//task不等于空直接运行,task等于空从workerQueue阻塞队列获取任务
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||//线程池运行状态大于等于STOP
                     (Thread.interrupted() && //线程是否已经被中断了
                      runStateAtLeast(ctl.get(), STOP))) &&//鲜橙汁运行状态大于等于STOP
                    !wt.isInterrupted())//判断任务的线程如果没有被中断
                    wt.interrupt();//中断当前任务线程
                try {
                    beforeExecute(wt, task);//钩子函数,实际任务运行之前做处理
                    Throwable thrown = null;
                    try {
                        task.run();//执行实际任务代码
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//钩子函数,实际任务运行之后做处理
                    }
                } finally {
                    task = null;//将任务置空
                    w.completedTasks++;//任务完成数加1
                    w.unlock();
                }
            }
            completedAbruptly = false;//执行过程中是否发成异常
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

//执行任务退出操作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // 如果有异常中断导致任务结束
            decrementWorkerCount();//将线程数量减1

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;//完成的任务数量累加
            workers.remove(w);//从workers的任务集合中移除当前任务
        } finally {
            mainLock.unlock();
        }

        tryTerminate();//尝试关闭线程池

        int c = ctl.get();//获取当前线程池的最新状态
        if (runStateLessThan(c, STOP)) {//如果当前任务状态小于STOP
            if (!completedAbruptly) {//当前任务执行无异常发生
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//根据allowCoreThreadTimeOut参数获取最小的线程数量
                if (min == 0 && ! workQueue.isEmpty())//如果核心线程允许退出,并且工作队列不为空
                    min = 1;//设置最小值为1,因为最后需要有线程去执行线程池的后续处理,所有线程都没了,后续线程池退出无线程处理
                if (workerCountOf(c) >= min)//如果工作的线程数量大于等最小值
                    return; // replacement not needed  直接返回
            }
            addWorker(null, false);//如果当前线程数已经小于最小线程数,那么需要保证最小线程数在运行,所以需要有保证线程池的正常运行,添加一个空任务。
        }
    }
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();//获取当前线程池状态
            int rs = runStateOf(c);//获取当前运行状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//如果线程池状态大于等于SHUTDOWN并且(线程数量大于等于STOP或者工作队列为空)
                decrementWorkerCount();//将线程池中线程数量减1
                return null;
            }

            int wc = workerCountOf(c);//获取当前线程池的线程数量

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//判断是否运行核心线程数超时,判断是否需要超时机制

            if ((wc > maximumPoolSize || (timed && timedOut))//工作线程大于最大线程池数量或者允许超时并且有超时的情况
                && (wc > 1 || workQueue.isEmpty())) {//并且线程池线程数量大于1或者阻塞队列为空
                if (compareAndDecrementWorkerCount(c))//CAS操作将线程池数量减1
                    return null;//返回空
                continue;//CAS失败继续
            }

            try {
                Runnable r = timed ?//允许超时从队列中拿任务并等待keepAliveTime时间
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();阻塞等待
                if (r != null)//获取的任务不为空
                    return r;//直接返回
                timedOut = true;//如果为空,超时标志位为true
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

3.addWorkerFailed方法解析

private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();//获取锁
        try {
            if (w != null)//work不是空
                workers.remove(w);//直接从workers中移除当前任务
            decrementWorkerCount();//加个ctl中的woker数量减少
            tryTerminate();//如果线程池已经是showdown状态,尝试让线程池停止。多线程协作的函数
        } finally {
            mainLock.unlock();
        }
    }

3.线程池关闭shutdown方法

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();//检查关闭权限,可以忽略
            advanceRunState(SHUTDOWN);//线程池状态递进,由running变为shutdown
            interruptIdleWorkers();//中断所有空闲线程
            onShutdown(); // hook for ScheduledThreadPoolExecutor钩子函数,调度线程池使用
        } finally {
            mainLock.unlock();
        }
        tryTerminate();//尝试将线程池关闭。
    }

1.advanceRunState方法解析

    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();//获取当前的线程状态
            if (runStateAtLeast(c, targetState) ||//当前状态已经是大于等于shutdown直接退出
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))//cas操作将线程状态改为targetState。
                break;
        }
    }

2.interruptIdleWorkers方法解析

    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();//获取锁
        try {
            for (Worker w : workers) {//遍历works中所有的工作任务
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {//如果没有被中断过,并且可以获得锁,证明属于空闲线程
                    try {
                        t.interrupt();//将线程中断,打上中断标志位
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();//解锁
                    }
                }
                if (onlyOne)//只中断一个线程标识
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

4.shutdownNow方法解析

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();//权限检查
            advanceRunState(STOP);//状态递进 详细方法见上面
            interruptWorkers();//中断所有启动的work线程
            tasks = drainQueue();//将所有未执行的任务出队保存
        } finally {
            mainLock.unlock();
        }
        tryTerminate();//尝试关闭线程池
        return tasks;
    }

1.interruptWorkers方法解析

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();//获取锁
        try {
            for (Worker w : workers)//遍历所有woker进行处理
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }


        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {//当前work的状态大于0并且线程不为空且线程未被中断
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

使用getState() >= 0表示当前线程已经启动,runWorker方法中会将其状态从-1改变。证明线程已经启动
       Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

2.drainQueue方法解析

//标准的入队和出队功能不做过多注释   
private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }
	

5.tryTerminate方法解析

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();//获取当前线程状态ctl
        if (isRunning(c) ||//线程池正在运行
            runStateAtLeast(c, TIDYING) ||//线程池状态大于等于TIDYING,有其他线程已经改变线程池状态为TIDYING或者TERMINATED了
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//线程池状态等于shutdown并且工作队列不为空。
            return;//以上三种情况线程池无法关闭,需要继续处理
        if (workerCountOf(c) != 0) { // Eligible to terminate//当前工作线程数量不等于0
            interruptIdleWorkers(ONLY_ONE);//中断线程且只中断一个
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//cas操作将线程池状态置为TIDYING
                try {
                    terminated();//线程池终止
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));//设置线程池状态为TERMINATED
                    termination.signalAll();//信号唤醒所有等待线程
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

​ 线程池的运用在项目中已经成为一种常态,作为一个开发人员最重要的了解其背后的设计原理以及流程,更好地运用线程池,方便提升项目程序的性能以及排查错误。在阅读对应的线程池源码时,我们只局限于单线程的思维,更多的是要去考虑当多线程并发执行时的临界条件。了解设计者的设计初衷、以及设计意图,能让你更好地在项目中运用并设计符合自己项目的线程池。以上是我个人对于线程池ThreadPoolExecutor的理解,不足之处,请多多指教。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK