线程池ThreadPoolExecutor源码解析

在源码解析之前,我们先思考一个问题:为什么要使用线程池?

如果不使用线程池,我们如何在程序中执行一些任务呢?

  1. 最显而易见的一种方式就是顺序执行,代码描述如下:
1
2
3
4
5
public static void main(String[] args) {
doTask1();
doTask2();
// ...
}
  1. 多线程执行,代码描述如下:
1
2
3
4
5
public static void main(String[] args) {
new Thread(Demo::doTask1).start();
new Thread(Demo::doTask2).start();
// ...
}

第一种方式编码简单,也不容易出错,但它的问题是效率不高,无法利用多核和系统调度的优势。第二种方式能利用多核的优势,为每个任务创建一个线程,由操作系统来调度任务,在任务不多的情况下它能很好的工作,但是当任务数量变多之后,线程创建销毁的性能损耗和线程的资源占用都将成为问题。

线程是宝贵的系统资源。

对于宝贵资源的使用,有一种通用的思想——池化。

这就是为什么我们要使用线程池的原因。下面这段代码是Java线程池ThreadPoolExecutor的基础使用案例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
4, // 核心池大小
6, // 最大池大小
1, SECONDS, // Worker线程超时时间
new ArrayBlockingQueue<>(4), // 工作队列
Thread::new, // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝任务的策略
);

// 提交task
threadPool.execute(Demo::doTask1);
threadPool.execute(Demo::doTask2);
// ...
threadPool.shutdown();
}

接下来我们根据上面这段示例代码,深入到源码,分析一下线程池的实现原理。

ThreadPoolExecutor的构造

上面的代码展示了ThreadPoolExecutor最基础的构造方法,一共有6个参数(明明是7个…),构造方法里面都是一些初始化操作,不在赘述,重点关注一下这6个参数。这里我先列举这些核心参数的用途,在后面源码分析的过程中我们会频繁看到这些参数的身影。

  • corePoolSize 线程池正常运行时池的大小,这里称之为核心池
  • maximumPoolSize 线程池能扩展到的最大线程数量,在核心池和工作队列都满的时候扩展,为了方便理解,这里将扩展出来的部分称之为临时池,也就是线程池=核心池+临时池
  • keepAliveTime + TimeUnit 临时池或者核心池(当设置allowCoreThreadTimeOut为true时)线程超时时间,线程超过这个时间没有处理任务就会退出
  • workQueue 工作队列(阻塞的),核心池的Worker线程全部启动的情况下,会将任务放到这个队列
  • threadFactory 线程工厂,用于创建Worker线程
  • RejectedExecutionHandler 拒绝策略,当核心池、工作队列和临时池都满了的情况下,或者线程池不是RUNNING状态都会调用拒绝策略

提交的任务是如何被执行的

ThreadPoolExecutor提交任务的核心方法只有一个,就是execute方法。其他的如submitinvoke方法都是在其父类AbstractExecutorService中使用模板方法模式实现的,归根结底还是调用了execute方法。

所以我们重点关注execute方法提交一个任务后都发生了什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // ① 核心池还没有满
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // ② 核心池满了,工作队列没满
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command); // 线程池不是RUNNING状态了,执行拒绝策略
else if (workerCountOf(recheck) == 0)
// 重新检查,核心池线程可能挂了,添加一个新的,
// 但是不用设置firstTask(因为task已经添加到workQueue了)
addWorker(null, false);
}
else if (!addWorker(command, false)) // ③ 核心池和工作队列都满了
reject(command); // 全都满了,或者线程池已经不是RUNNING状态了,执行拒绝策略
}

从上面的源码可以看出,提交一个任务后,主要有3个分支,接下来我们详细分析这3种情况都做了哪些事情。

1. 核心池还没有满

此时workerCount < corePoolSize,工作队列还是空的。

通过上面execute的源码①的位置,调用了addWorker方法,其源码如下所示。这个方法比较长,我添加了比较详细的注释,后面还有很多地方会遇到这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// ① 线程池状态不是RUNNING的时候不能添加新的Worker线程
// 但是有一种情况例外 就是 rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())
// 这是因为,处于SHUTDOWN状态下,线程池不在接收新任务,但是需要处理工作队列里面剩余的任务。
// 此时如果一个Worker线程因为用户任务异常而终止,允许添加新的Worker线程,以尽快处理完工作队列的任务。
// processWorkerExit()方法里面,调用addWorker(null, false);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
// ② workerCount不能超过最大值CAPACITY(29个1)
// 不能超过corePoolSize或者maximumPoolSize(取决于core参数)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// ③ CAS更新一下workerCount,如果失败就重试(内层循环),成功了就执行下面创建线程的代码
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// ④ 重新检查一下线程池状态,如果改变了,需要跳到外层循环,再来一次
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// ⑥ 新建一个Worker线程,启动后,先执行firstTask,然后轮询工作队列,执行任务
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// ⑦ 线程池必须是RUNNING状态
// 或者SHUTDOWN状态(对应上面的线程因为用户任务异常终止的情况)
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// ⑧ 如果worker添加成功了,启动线程,接下来Worker::run的代码就在新线程里面跑起来了
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// ⑨ 启动失败有下面几种可能
// 1. new Worker 的时候可能因为 ThreadFactory::newThread 抛出异常
// 2. 线程池状态不是RUNNING了
// 3. 新线程已经被别人启动了,抛出IllegalThreadStateException异常
if (! workerStarted)
// ⑩ 这里就是移除worker、workerCount减一,然后尝试终止线程池,就不多哔哔了
addWorkerFailed(w);
}
return workerStarted;
}

上面的代码比较多,但是逻辑其实比较简单,当核心池还没满的时候,就创建一个新的Worker线程,添加到核心池里,然后启动线程,这个线程执行的第一个任务就是我们提交的这个任务,然后线程会不停的从工作队列中取任务执行。整个流程如下图所示:

其中runWorker就是具体执行任务的方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// ① 第一次执行firstTask,然后从workQueue中取任务,
// 如果取出来的是null(超时了或被中断了),当前线程就要结束了
while (task != null || (task = getTask()) != null) {
// ② 加锁之前,可能会被shudown()方法中断(SHUTDOWN状态),
// 但加锁之后,不会响应shutdown()的中断,
// 总是会响应shutdownNow()的中断(STOP状态)
w.lock();
// ③ 如果是SHUTDOWN状态,清空中断状态,希望用户任务继续执行;
// 如果是STOP状态,中断当前线程,希望当前用户任务停止;
// 如果用户任务里面没有实现中断处理,这个任务还是会正常执行完
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // hook 可以自己实现,可能抛出异常
Throwable thrown = null;
try {
task.run(); // ④ 执行用户任务,可能抛出异常
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // hook 可以自己实现,可能抛出异常
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// ⑤ 如果completedAbruptly=true,说明用户任务抛出了异常。
// 需要新建一个Worker替换现有的,否则删除这个Worker
processWorkerExit(w, completedAbruptly);
}
}

从上面的代码我们发现,每次执行任务之前都会加锁,这难道不会影响性能吗?其实并不会,因为这里几乎不会有锁竞争(只有其他线程调用shutdown时才有微弱的竞争),lock相当于退化成了一次CAS操作,不会影响性能。

2. 核心池满了,工作队列没满

此时workerCount == corePoolSize,工作队列还能offer成功。

通过上面execute方法的源码②位置可以发现,这种情况只是将当前任务提交给工作队列,然后在某个核心池Worker线程执行完上一个任务之后,取出执行。如下图所示。

其中通过getTask方法轮询工作队列,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// ① 有两种情况可以继续取任务,其他情况直接返回null
// 1. 线程池状态为RUNNING
// 2. 线程池状态为SHUTDOWN并且工作队列不为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// ② allowCoreThreadTimeOut设置为true的时候才允许核心池超时,否则只允许临时池超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// ③ 这个地方比较有意思,在以下几种情况下会返回null
// 1. workerCount大于maximumPoolSize(通过setMaximumPoolSize方法),返回null
// 2. poll超时了并且当前线程不是池里最后一个线程,返回null
// 3. poll超时了并且当前线程是池里最后一个线程,此时如果workQueue为空,直接返回null,
// 否则还需要继续执行完workQueue里面的任务,毕竟你是最后的火种了啊
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// ④ 允许超时调用poll超时等待,否则调用take一直等待
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true; // ⑤ 已经超时了,下次循环处理
} catch (InterruptedException retry) {
timedOut = false; // ⑥ 被中断了,说明还没有超时额
}
}
}

3. 核心池和工作队列都满了

此时workerCount >= corePoolSize,并且工作队列offer会返回false。

这种情况因为工作队列都满了,说明核心池的线程们已经跟不上任务提交的速度了,所以我们需要更多的线程处理任务。考虑到可能过一段时间任务提交的就没有这么频繁了,核心池又能跟上任务提交的速度了,那么新增的这些线程就有些浪费了,所以这部分线程应该是能被回收的,从getTask方法的源码可以发现,这一部分线程确实是允许超时的。为了便于区分,我们将超出核心池的部分称之为临时池。

通过上面execute源码③位置可以发现,调用addWorker方法并设置core参数为false,然后和第一种情况类似,会新建一个Worker线程,添加到池中,执行当前任务。

通过上面的分析,我们可以得出以下结论:

当我们提交一个任务到线程池时,如果核心池线程没满,就新建一个核心池线程并执行当前任务;如果核心池满了,就提交任务给工作队列,队列中的任务由核心池线程轮询执行;如果队列也满了,说明压力有点大,需要扩展核心池,新建一个临时池线程并执行当前任务,此时工作队列中的任务由核心池和临时池共同轮询执行。

线程池状态转移

通过上面的一些源码我们发现很多地方都有对线程池状态的判断,所以弄清楚线程池状态的转移,更有利于我们理解这些源码。

Doug Lea 大佬为线程池定义了5个状态,分别是:

  • RUNNING 线程池正常运行状态,接收新任务,处理工作队列的任务
  • SHUTDOWN 停止接收新任务,但是会继续处理工作队列的任务。调用shutdown方法会转移到这个状态
  • STOP 停止接收新任务,排空工作队列,但是会执行每个线程当前的任务。调用shutdownNow方法会转移到这个状态
  • TIDYING 当工作队列排空并且所有Worker线程都终止进入此状态,这个状态下会调用terminated方法
  • TERMINATED 当terminated方法执行完成时进入此状态,线程池的最终状态

这些状态的转移都是不可逆的,状态转移图如下所示:

state和workerCount存储在同一个字段ctl中,5个状态至少需要3个bit位才能存的下,所以用ctl的高3位存储state,低29位存储workerCount。

1
2
3
4
5
6
7
8
// 高3位存储state,低29位存储workerCount
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 高3位 1 1 1
private static final int SHUTDOWN = 0 << COUNT_BITS; // 高3位 0 0 0
private static final int STOP = 1 << COUNT_BITS; // 高3位 0 0 1
private static final int TIDYING = 2 << COUNT_BITS; // 高3位 0 1 0
private static final int TERMINATED = 3 << COUNT_BITS; // 高3位 0 1 1

通过调用shutdown方法转移到SHUTDOWN状态,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // 设置状态为SHUTDOWN
interruptIdleWorkers(); // 中断空闲(阻塞在take或poll)的Worker线程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate(); // 尝试转移到TERMINATED状态
}

代码如下所示:

advanceRunState执行状态转移之后,通过对上面addWorker方法的源码①的分析,可以知道SHUTDOWN状态不再接收新任务了。然后尝试中断空闲的Worker线程,代码如下,此时onlyOne为false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 如果tryLock加锁成功,说明线程可能阻塞在getTask上,中断它
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

如果这里成功中断了线程,通过对上面runWorker方法的源码③和getTask源码①⑥的分析,可以知道SHUTDOWN状态还会继续执行工作队列里面的任务。

最后会调用tryTerminate方法,尝试过渡到TERMINATED状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// ① 以下三种情况直接返回
// 1. 线程池处于RUNNING状态,不允许从RUNNING直接转移到TIDYING;
// 2. 已经是TIDYING或者TERMINATED状态了,不要做重复的事情;
// 3. 处于SHUTDOWN状态,但是workQueue里还有任务,需要先处理完排队的任务。
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// ② 可能处于STOP状态或者SHUTDOWN状态(队列空了),此时有资格转移状态到TERMINATED了,
// 但是还有Worker线程没执行完毕,或者阻塞在getTask上了,中断它们。
if (workerCountOf(c) != 0) { // Eligible to terminate
// ③ 这里只中断其中一个线程,通过processWorkerExit传播给其他线程
interruptIdleWorkers(ONLY_ONE);
return;
}

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// ④ 转移到TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // hook 回收资源的机会,覆盖它
} finally {
// ⑤ 转移到TERMINATED状态,通知阻塞在awaitTermination的线程,天亮了
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

shutdowNow方法与shutdown类似,不过它会转移到STOP状态并强制中断Worker线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); // 转移到STOP状态
interruptWorkers(); // 中断Worker线程
tasks = drainQueue(); // 将工作队列排空,作为返回值
} finally {
mainLock.unlock();
}
tryTerminate(); // 尝试转移到TERMINATED状态
return tasks;
}

其中interruptWorkers会调用所有Worker的interruptIfStarted方法。

1
2
3
4
5
6
7
8
9
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}

注意interruptIfStarted方法和interruptIdleWorkers的区别,这里不需要tryLock,直接中断,所以在用户任务中是可以接收到中断“信号”的。

getTask的源码①可以知道,处于STOP状态时不再处理工作队列的任务。

此时,所有的Worker会将当前正在执行的任务执行完,然后终止线程。在线程终止之前,需要执行processWorkerExit方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 如果当前worker是最后一个,就能成功过渡到TERMINATED状态
// 否则继续中断其他worker,参考tryTerminate的源码②
tryTerminate();

int c = ctl.get();
// 如果线程池处于RUNNING状态或者SHUTDOWN状态(队列不空),
// 说明当前worker因为用户任务的异常终止,需要新建一个worker替换之前的
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

最终所有的Worker线程都终止了,workerCount等于0了,并且工作队列也为空了,线程池成功过渡到TERMINATED状态,整个线程池的生命周期也就结束了。

本文标题:线程池ThreadPoolExecutor源码解析

文章作者:山坡杨

发布时间:2020年10月12日 - 19:06:32

最后更新:2020年10月14日 - 09:26:05

原始链接:http://www.yangxf.top/15/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

感觉本站内容不错,读后有收获?
0%