线程池探幽(一)

探幽一词,初见于C++ Primer,表示对一个事物进行深刻的了解。

前文优雅的使用线程池中,介绍了线程池使用上面的tips,以及指出了juc中Executors工厂的问题所在。
了解过上文的内容后,可以看出Java中线程池的实现核心类便是 ThreadPoolExecutor

本文将从 ThreadPoolExecutor 的详细实现谈起,以避免使用线程池给程序的隐患。

为什么使用线程池

说到为什么使用线程池,其实最根本的原因是因为创建/销毁线程是十分占用资源的,同时执行相同任务的多个线程也需要进行统一的管理。在此场景下,享元模式的设计思想,即用共享技术有效的支持大量的细粒度对象,便可以发挥作用,线程池也就应运而生。

通过线程池的使用,降低了创造/销毁线程的成本,也降低了对于多个线程的维护成本。线程池主要实现了对线程资源的持有,以及对于任务的调度。如上文所提,Java通过ThreadPoolExecutor来实现线程池,那么接下来就一点一点的看一下ThreadPoolExecutor的实现。

ThreadPoolExecutor

前序

Java源码中,类头的JavaDoc通常包含有该类的基本特性以及概览信息,对ThreadPoolExecutor的解读也从这开始。

  • ThreadPoolExecutor是ExecutorService的一个实现,通过使用池化共享的线程池,来执行任务,通常使用Executors提供的工厂方法来构造;
  • 线程池声明了如下两个问题:由于降低了每个任务实际执行前的开销,在大量异步任务的场景下,使用线程池通常会提供更好的性能;同时其提供了一种将任务集合和资源(线程)绑定的策略,除此之外ThreadPoolExecutor还提供一些基本的统计信息,例如完成任务数等;
  • ThreadPoolExecutor类提供了很多钩子用于扩展,推荐使用Executors工厂类进行使用,如果需要扩展,需要注意遵循以下指导。

ThreadPoolExecutor类的JavaDoc主要分为三部分,在这里我们省略了第三部门的具体关于参数的最佳实践并在后文专门讨论。

从三段文字中,可以解读内容如下:

  • ThreadPoolExecutor类实现了 ExecutorService的接口,可以通过其submit接口进行任务的递交,本质上是一个 任务执行器 的线程池实现
  • 线程池在大量任务场景下效率较好,是由于减少了每次创建/销毁线程的开销
  • 由于参数复杂,不建议自己直接创建ThreadPoolExecutor实例

ThreadPoolExecutor的初始化

官方Doc中至少在jdk1.8中仍建议使用Executors工厂来屏蔽具体线程池创建的参数细节,但是该工厂隐患较大,容易造成内存OOM的风险,故实际使用中,还是应该直接使用ThreadPoolExecutor来使用线程池,使用的最开始,自然是从构造方法开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

由于在之前文章中已经介绍过参数的意义,故在此不进行赘述。简单的来说,此类的构造方法只是通过参数进行基本成员的初始化,并无其他操作信息,值得注意的是,在构造方法里并没有创建新线程。结合JavaDoc注释,线程池在执行任务,且当前核心线程数少于指定的最大核心线程数时,会进行线程的创建。这也侧面佐证了为什么定义中提到“大量”任务场景下性能较优,因为在线程池接受的任务少于核心线程数的时候,仍然每次执行任务要创建线程,造成额外的开销。

提交任务

对于线程池,最核心的就是提交任务和执行的能力。从源码中可以看出,ThreadPoolExecutor 中与提交相关的接口定义位于其实现的 ExecutorService 接口中。
ExecutorService 接口的定义中,官方提到:ExecutorService 是一个提供了终止管理和可通过 Future 机制追踪异步任务执行情况的 Executor(执行器)。ExecutorService中定义了多种重载的 submit 方法用于任务的提交,具体如下:

  • Future submit(Callable task): 提交有返回值的任务,并可通过Future获取返回值(一旦任务执行完成,便可以通过Future#get获取到执行结果)
  • Future submit(Runnable task, T result):提交无返回值任务和返回值
  • Future submit(Runnable task):提交无返回值任务

以上三个方法便为核心的提交任务方法,除此之外 ExecutorService 还进行了 invokeAll 的多种重载定义,用来批量提交任务。

submit 方法

在多种重载下,我们从最常用的 submit(Runnable) 开始看起。ThreadPoolExecutor 中并没有直接实现该方法,其是由 ThreadPoolExecutor 的父类 AbstractExecutorService 实现。该方法定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
```
由上文中方法定义看出,submit 方法做的事情只有如下三步骤:
- 将 Runnable 任务包装成 RunnableFuture
- 调用 execute 方法提交并执行
- 将根据传入 Runnable 包装的 RunnableFuture 返回
由上面的简单定义,可以看到 submit 方法是通过将传入的真实任务进行包装,包装成 Future 的子类后提交执行并将其返回,接下来 newTaskFor 方法的定义如下:
```Java
/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @param <T> the type of the given value
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

该方法定义也比较简单,相当于是对runnable进行包装,然后返回 FutureTask 供执行并返回(在此先暂时省略掉有关于 Future 概念及实现的介绍)。

重新回到 submit 方法的定义,在获取到包装的 RunnableFuture 后,会通过execute(Runnable)方法真正的提交任务到具体执行逻辑。

execute 方法

Executor中,定义了 execute(Runnable) 方法。该方法的定义表达了该方法的目的是用于在未来某个时间执行一个任务,执行任务的方式可以是用线程池中创建好的线程、创建新的线程等等。该方法在具体实现中,可以根据实际的需求进行实现,我们此刻关心的便是 ThreadPoolExecutor 中该方法的实现。实现具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

以上短短不到20行的主要代码便是线程池特有的任务执行逻辑实现,由代码和注释可以看出主要有三部分策略:

  • 判断正在运行的线程数是否比配置的核心线程数少,如果少的话则直接创建新线程并将该任务提交给这一线程执行,这部分具体逻辑由addWorker(Runnable, boolean) 方法来实现,如果成功,流程结束;
  • 运行线程已经等于配置的核心线程数后,会尝试将任务放入线程池等待队列,成功后仍然会通过双检锁来判断是否需要启动新线程以及判断线程池是否已经停止。
  • 如果任务放入等待队列失败,仍然会继续尝试去添加新线程,如果还是失败,那就真的说明线程池已经饱和或者已经停止了,任务被拒绝

从以上三个流程,可以了解到线程池执行的原理,对线程池执行任务的思路有基本的掌握。

小结

通过通篇的简单介绍,相信在看的你已经对线程池有了一些了解。ThreadPoolExecutor作为JUC中的经典实现,其多处涉及到多线程同步的操作(例如并发取任务的锁控制、提交任务时的检测机制)等,均有着可取之处。同时其submit与execute方法也是一个比较容易争论的点。线程池的神秘面纱也只是揭开了冰山一角,剩下的更深层次的源码阅读还会继续进行。

接下来对线程池的理解,将分为如下几个议题开启:

  • ctl的巧妙设计
  • execute与submit深入了解
  • ThreadPoolExecutor的并发设计