2

Java 线程池系列-第三篇

 6 months ago
source link: https://nicksxs.me/2024/02/24/Java-%E7%BA%BF%E7%A8%8B%E6%B1%A0%E7%B3%BB%E5%88%97-%E7%AC%AC%E4%B8%89%E7%AF%87/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Java 线程池系列-第三篇

2024-02-24Java 1 1

第三篇我们要来讲下 Worker 这个“打工人”,线程池里实际在干活的同学
Worker 实现了 Runnable 接口,所以在前面介绍的 addWorker
线程启动其实就调用了下面的 run 方法,而 run 方法则调用了内部的
runWorker 方法


private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
// 启动线程 Worker
public void run() {
runWorker(this);
}

介绍下这个 runWorker 方法
这里就会先处理 firstTask,如果 firstTask 是 null 则会去队列里取
里面会有两个钩子,一个是执行前,一个是执行后,注意这个循环是没有常规
退出逻辑的,说明是一直在往队列里获取,除非获取队列里的任务超时失败了
这样就到最后会去执行 processWorkerExit

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// firstTask 不为空或者从队列里获取到任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 前置执行钩子
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行完后的钩子
afterExecute(task, thrown);
}
} finally {
// 最后会执行到这,清空任务
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 循环的最后就会去退出Worker
processWorkerExit(w, completedAbruptly);
}
}

而这个 processWorkerExit 也有个比较特殊的逻辑
照理我们会想到这里可能就是线程在处理完队列里的任务
以后,就会判断下是否要退出,如果是核心线程的话就不用
如果是非核心线程就会退出了,但是这里其实有个替换逻辑
就是最后有个 addWorker 调用

private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
// 判断状态
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 核心线程是否也需要退出
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 这里代表非核心线程就可以不用替换,也就是回收线程了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 否则是替换线程
addWorker(null, false);
}
}

这里主要讲了线程 Worker 运行的逻辑


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK