优雅的使用线程池

从线程池使用谈起

创建并使用一个线程池

线程池这一概念,想必所有开发者都不陌生。它的应用场景十分广泛,可以被广泛的用于高并发的处理场景。Java 在 juc 包内提供了许多线程池相关的类,可以帮我们快速的构建一个线程池。目前 juc 提供的 Executors 工厂类,可以方便的创建线程池,其提供了创建无限大的线程池、指定大小线程池、定时调度线程池以及单个线程池等等,我们可以通过以下代码简单的创建一个线程池。

1
2
3
4
//创建一个不限制线程个数的线程池
ExecutorService executor = Executors.newCachedThreadPool();
//创建一个固定线程个数的线程池
ExecutorService executor = Executors.newFixedThreadPool(10);

同样,Executors提供的工厂方法中,我们也可以通过传入一个 ThreadFactory 来自定义线程创建时的一些属性,如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static ExecutorService exec = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("worker-thread-" + UUID.randomUUID().toString());
return t;
}
});
//使用lambda可替换为
private static ExecutorService exec = Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r);
t.setName("worker-thread-" + UUID.randomUUID().toString());
return t;
});

以上代码可以看出,通过Executors提供的工厂方法,我们可以很简单的创建一个线程池来使用。通过 ExecutorService 提供的 submit(Runnable) 接口,即可简单的向线程池提交任务。

CachedThreadPool 与 FixedThreadPool

作为最常用的两种线程池,CachedThreadPool 和 FixedThreadPool 在不同场景,有着不同的应用。

  • CachedThreadPool 主要被应用在响应时间要求高、数据量可控的场景,由于其不限制创建线程的个数,故若数据量不可控,会造成程序 OOM
  • FixedThreadPool 主要被应用在线程资源有限,数据量较小或不可控场景,由于其线程数量有限,针对于过多的数据量,默认将会进行丢弃,但是不会造成程序 OOM
    我们可以通过自己的实际场景需求,选择不同的线程池。

ThreadPoolExecutor

Executors 工厂的实现

Executors仅是一个工厂类,查看它的实现我们便可以看到,其不同线程池(暂时不考虑调度线程池)的底层均为 ThreadPoolExecutor 实例,通过不同的初始化参数形成了不同的特性,CachedThreadPool 和 FixedThreadPool 的创建部分源码如下。

1
2
3
4
5
6
7
8
9
10
11
12
//创建CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//创建FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

ThreadPoolExecutor 的构建

以上便是工厂 Executors 中创建线程池的具体实现。从实现代码中,可以看出,不同特性的线程池本质都是构建 ThreadPoolExecutor 对象。查看 ThreadPoolExecutor 类的源码可以看到,其构造方法定义如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

构造参数简介如下:

  • corePoolSize:线程池中一直有的线程个数,默认情况下即使空闲也不会被回收(可以通过设置allowCoreThreadTimeOut参数来改变默认)
  • maximumPoolSize:线程池中可以持有的最多线程数
  • keepAliveTime:超过corePoolSize数的空闲线程在被销毁之前等待新任务到达的最长时间
  • unit:keepAliveTime参数的单位
  • workQueue:线程池的等待队列,被execute方法提交的任务将进入这一队列,默认无限大
  • threadFactory:线程工厂,可以自定义线程的创建过程
  • handler:拒绝处理器,负责在workQueue满的时候处理新提交的任务

反观Executors工厂的实现,可以看出,针对于 FixedThreadPool 的创建,其实就是创建一个核心线程和最大线程均为固定值的线程池,以保证只有固定个线程提供服务;针对于 CachedThreadPool 的创建,则是创建一个核心线程数为0、最大线程数为整型最大值的线程池。

Executors存在的问题

由于篇幅有限,此处仅针对日常使用最多的 CachedThreadPool 和 FixedThreadPool 两种线程池进行问题分析。

FixedThreadPool

再次回顾FixedThreadPool的定义,代码如下。

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

由代码可以看到,除却核心线程和最大线程数都设置为固定值,FixedThreadPool还使用了一个无长度限制的等待队列。
在使用上,通常都会认为FixedThreadPool是不会占过多资源的。但是使用中,FixedThreadPool仍然会可能出现 OOM 的风险。这是因为,由于FixedThreadPool采用无界的等待队列,一旦空闲线程被用尽,就会向队列中加入任务,这时一旦任务进入速度远高于线程处理能力,就有出现 OOM 的可能。
阿里巴巴编码规范中,也有关于线程池的使用说明。其建议通过直接定义 ThreadPoolExecutor 来代替使用 Executors 提供的工厂方法。在我们处理的数据量较大或者并发量很大时,应避免直接使用 Executors 提供的 FixedThreadPool。

CachedThreadPool

CachedThreadPool的定义在前文中也已经介绍过,具体如下。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

由代码可以看到,CachedThreadPool将空闲线程销毁前的等待时间设置成了60s,同时采用SynchronousQueue,不进行等待队列的设置。
CachedThreadPool 在一定程度上能够应对不间断突增的并发量,但是一旦对总量把控不好,就容易引发OOM。

线程池提交任务实现阻塞等待

由于 FixedThreadPool 因为等待队列无限大可能会导致OOM,所以我们可以通过直接创建 ThreadPoolExecutor 来替代使用 Executors.newFixedThreadPool,在构建过程中通过指定等待队列大小,来避免出现OOM。但是由于 ThreadPoolExecutor 在等待队列满时,会拒绝任务插入并直接丢弃,所以针对于不可以丢弃的任务,就不能简单的采用这种方式。
例如,一个 Consumer 在不断的消费 MQ,并希望通过不同的 Worker 线程来并发处理。如果采用上述方案,那么在消费速率快,Worker 线程池等待队列慢的情况下,就会发生丢数据,这显然是我们不想看到的。在更多时候,我们需要的都是一个可以阻塞等待的线程池。

变更拒绝策略

说到让线程池提交任务阻塞等待,最简单的方式就是通过增加一个拒绝策略,该策略中做的便是对等待队列进行阻塞写入,也就实现了线程池提交任务的阻塞等待,具体如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* @author jayden
*/
public class ExecutorsDemo {
private static ExecutorService exec = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), r -> {
Thread t = new Thread(r);
t.setName("worker-thread-" + UUID.randomUUID().toString());
return t;
}, (r, executor) -> {
if (!executor.isShutdown()) {
try {
//阻塞等待put操作
System.err.println("waiting queue is full, putting...");
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
private static AtomicInteger at = new AtomicInteger(0);
public static void main(String[] args) {
while (true) {
exec.submit(() -> {
System.err.println("Worker" + at.getAndIncrement() + " start.");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println("Worker end.");
});
}
}
}

以上程序中,通过直接初始化 ThreadPoolExecutor 并指定拒绝策略的方式,来实现线程池任务的阻塞提交。

使用信号量

为每个需要阻塞的线程池增加拒绝策略的这种方式虽然可行,但是每次初始化都要添加重复代码明显感觉不太优雅。在思考如何能优雅的实现时,想到可以添加一层代理。代理类持有真正的线程池,同时持有信号量。通过信号量来控制线程池任务的提交,不改变原有线程池的定义。具体如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* @author jayden
*/
public class BlockedThreadPool {
private ExecutorService executor;
private Semaphore semaphore;
/**
* 接收两个参数,最大允许线程数,自定义线程名
*
* @param nThreads
* @param name
*/
private BlockedThreadPool(int nThreads, String name) {
if (nThreads <= 0) {
throw new IllegalArgumentException();
}
semaphore = new Semaphore(nThreads);
executor = Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r);
t.setName(name + UUID.randomUUID().toString());
return t;
});
}
/**
* 提供工厂方法
*
* @param nThread
* @param name
*/
public static BlockedThreadPool createBlockedThreadPool(int nThread, String name) {
return new BlockedThreadPool(nThread, name);
}
/**
* 向线程池提交任务
* @param r
*/
public void submit(Runnable r) {
executor.submit(() -> {
try {
semaphore.acquire();
r.run();
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}

在该示例中,使用了 Executors.newCachedThreadPool 来构造真正线程池实例,由于这种实现是通过信号量来控制并发及阻塞的,故不需要在线程池本身层面进行限制设置。
上文代码仅是一个示例demo,根据不同业务场景,还需要进一步的抽象及扩展。

总结

本文中所考虑的问题最初来自于Kafka消费服务在生产环境的一次OOM。在最初使用 Executors.newFixedCachePool 方法构建线程池中,虽然指定了线程池大小但是还是出现了OOM,经调查才搞明白具体的原因。本文意在抛砖引玉,仅是针对于单一场景的一个总结。JUC 博大精深,还需要继续深入探究。本文中示例代码,请参考java-demo/threadpool