从线程池使用谈起
创建并使用一个线程池
线程池这一概念,想必所有开发者都不陌生。它的应用场景十分广泛,可以被广泛的用于高并发的处理场景。Java 在 juc
包内提供了许多线程池相关的类,可以帮我们快速的构建一个线程池。目前 juc 提供的 Executors 工厂类,可以方便的创建线程池,其提供了创建无限大的线程池、指定大小线程池、定时调度线程池以及单个线程池等等,我们可以通过以下代码简单的创建一个线程池。
同样,Executors提供的工厂方法中,我们也可以通过传入一个 ThreadFactory
来自定义线程创建时的一些属性,如下。
以上代码可以看出,通过Executors提供的工厂方法,我们可以很简单的创建一个线程池来使用。通过 ExecutorService 提供的 submit(Runnable) 接口,即可简单的向线程池提交任务。
CachedThreadPool 与 FixedThreadPool
作为最常用的两种线程池,CachedThreadPool 和 FixedThreadPool 在不同场景,有着不同的应用。
- CachedThreadPool 主要被应用在响应时间要求高、数据量可控的场景,由于其不限制创建线程的个数,故若数据量不可控,会造成程序 OOM
- FixedThreadPool 主要被应用在线程资源有限,数据量较小或不可控场景,由于其线程数量有限,针对于过多的数据量,默认将会进行丢弃,但是不会造成程序 OOM
我们可以通过自己的实际场景需求,选择不同的线程池。
ThreadPoolExecutor
Executors 工厂的实现
Executors仅是一个工厂类,查看它的实现我们便可以看到,其不同线程池(暂时不考虑调度线程池)的底层均为 ThreadPoolExecutor
实例,通过不同的初始化参数形成了不同的特性,CachedThreadPool 和 FixedThreadPool 的创建部分源码如下。
ThreadPoolExecutor 的构建
以上便是工厂 Executors 中创建线程池的具体实现。从实现代码中,可以看出,不同特性的线程池本质都是构建 ThreadPoolExecutor 对象。查看 ThreadPoolExecutor 类的源码可以看到,其构造方法定义如下。
构造参数简介如下:
- corePoolSize:线程池中一直有的线程个数,默认情况下即使空闲也不会被回收(可以通过设置allowCoreThreadTimeOut参数来改变默认)
- maximumPoolSize:线程池中可以持有的最多线程数
- keepAliveTime:超过corePoolSize数的空闲线程在被销毁之前等待新任务到达的最长时间
- unit:keepAliveTime参数的单位
- workQueue:线程池的等待队列,被execute方法提交的任务将进入这一队列,默认无限大
- threadFactory:线程工厂,可以自定义线程的创建过程
- handler:拒绝处理器,负责在workQueue满的时候处理新提交的任务
反观Executors工厂的实现,可以看出,针对于 FixedThreadPool
的创建,其实就是创建一个核心线程和最大线程均为固定值的线程池,以保证只有固定个线程提供服务;针对于 CachedThreadPool
的创建,则是创建一个核心线程数为0、最大线程数为整型最大值的线程池。
Executors存在的问题
由于篇幅有限,此处仅针对日常使用最多的 CachedThreadPool 和 FixedThreadPool 两种线程池进行问题分析。
FixedThreadPool
再次回顾FixedThreadPool的定义,代码如下。
由代码可以看到,除却核心线程和最大线程数都设置为固定值,FixedThreadPool还使用了一个无长度限制的等待队列。
在使用上,通常都会认为FixedThreadPool是不会占过多资源的。但是使用中,FixedThreadPool仍然会可能出现 OOM 的风险。这是因为,由于FixedThreadPool采用无界的等待队列,一旦空闲线程被用尽,就会向队列中加入任务,这时一旦任务进入速度远高于线程处理能力,就有出现 OOM 的可能。
阿里巴巴编码规范中,也有关于线程池的使用说明。其建议通过直接定义 ThreadPoolExecutor 来代替使用 Executors 提供的工厂方法。在我们处理的数据量较大或者并发量很大时,应避免直接使用 Executors 提供的 FixedThreadPool。
CachedThreadPool
CachedThreadPool的定义在前文中也已经介绍过,具体如下。
由代码可以看到,CachedThreadPool将空闲线程销毁前的等待时间设置成了60s,同时采用SynchronousQueue,不进行等待队列的设置。
CachedThreadPool 在一定程度上能够应对不间断突增的并发量,但是一旦对总量把控不好,就容易引发OOM。
线程池提交任务实现阻塞等待
由于 FixedThreadPool 因为等待队列无限大可能会导致OOM,所以我们可以通过直接创建 ThreadPoolExecutor 来替代使用 Executors.newFixedThreadPool,在构建过程中通过指定等待队列大小,来避免出现OOM。但是由于 ThreadPoolExecutor 在等待队列满时,会拒绝任务插入并直接丢弃,所以针对于不可以丢弃的任务,就不能简单的采用这种方式。
例如,一个 Consumer 在不断的消费 MQ,并希望通过不同的 Worker 线程来并发处理。如果采用上述方案,那么在消费速率快,Worker 线程池等待队列慢的情况下,就会发生丢数据,这显然是我们不想看到的。在更多时候,我们需要的都是一个可以阻塞等待的线程池。
变更拒绝策略
说到让线程池提交任务阻塞等待,最简单的方式就是通过增加一个拒绝策略,该策略中做的便是对等待队列进行阻塞写入,也就实现了线程池提交任务的阻塞等待,具体如下。
以上程序中,通过直接初始化 ThreadPoolExecutor 并指定拒绝策略的方式,来实现线程池任务的阻塞提交。
使用信号量
为每个需要阻塞的线程池增加拒绝策略的这种方式虽然可行,但是每次初始化都要添加重复代码明显感觉不太优雅。在思考如何能优雅的实现时,想到可以添加一层代理。代理类持有真正的线程池,同时持有信号量。通过信号量来控制线程池任务的提交,不改变原有线程池的定义。具体如下。
在该示例中,使用了 Executors.newCachedThreadPool
来构造真正线程池实例,由于这种实现是通过信号量来控制并发及阻塞的,故不需要在线程池本身层面进行限制设置。
上文代码仅是一个示例demo,根据不同业务场景,还需要进一步的抽象及扩展。
总结
本文中所考虑的问题最初来自于Kafka消费服务在生产环境的一次OOM。在最初使用 Executors.newFixedCachePool
方法构建线程池中,虽然指定了线程池大小但是还是出现了OOM,经调查才搞明白具体的原因。本文意在抛砖引玉,仅是针对于单一场景的一个总结。JUC 博大精深,还需要继续深入探究。本文中示例代码,请参考java-demo/threadpool