Java高并发编程之线程池原理
本文不再对线程池原理进行细致的分析,更详细的线程池原理请参考《Java线程池实现原理(转美团技术团队)》
使用线程池主要有以下三个原因:
- 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程。
- 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
- 可以对线程做统一管理。
# 几个基本概念
# ThreadPoolExecutor提供的构造方法
ThreadPoolExecutor重载了四个构造方法,只是参数个数不一致,请读者自行分辨。以下提供全参构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
2
3
4
5
6
7
- corePoolSize:线程池中核心线程数最大值
提示
核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)。
int maximumPoolSize:线程池中线程总数最大值。该值等于核心线程数量 + 非核心线程数量。
long keepAliveTime:非核心线程闲置超时时长。非核心线程如果处于闲置状态超过该值,就会被销毁。如果设置
allowCoreThreadTimeOut(true),则会也作用于核心线程。TimeUnit unit:keepAliveTime的单位。TimeUnit是一个枚举类型 ,包括以下属性:
NANOSECONDS: 1微毫秒 = 1微秒 / 1000MICROSECONDS: 1微秒 = 1毫秒 / 1000MILLISECONDS: 1毫秒 = 1秒 /1000SECONDS: 秒MINUTES: 分HOURS: 小时DAYS: 天BlockingQueue workQueue:阻塞队列,维护着等待执行的Runnable任务对象。常用的几个阻塞队列:
LinkedBlockingQueue:链式阻塞队列,底层数据结构是链表,默认大小是Integer.MAX_VALUE,也可以指定大小。ArrayBlockingQueue:数组阻塞队列,底层数据结构是数组,需要指定队列的大小。SynchronousQueue:同步队列,内部容量为0,每个put操作必须等待一个take操作,反之亦然。DelayQueue:延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。
(可选)ThreadFactory threadFactory:创建线程的工厂 ,用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。
(可选)RejectedExecutionHandler handler:拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为 :
ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
# 六种常见的线程池
# newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2
3
4
5
CacheThreadPool的运行流程如下:
- 提交任务进线程池。
- 因为corePoolSize为0的关系,不创建核心线程,线程池最大为Integer.MAX_VALUE。
- 尝试将任务添加到SynchronousQueue队列。
- 如果SynchronousQueue入列成功,等待被当前运行的线程空闲后拉取执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从SynchronousQueue拉取任务并在当前线程执行。
- 如果SynchronousQueue已有任务在等待,入列操作将会阻塞。
当需要执行很多短时间的任务时,CacheThreadPool的线程复用率比较高, 会显著的提高性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool并不会占用很多资源。
# newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2
3
4
5
核心线程数量和总线程数量相等,都是传入的参数nThreads,所以只能创建核心线程,不能创建非核心线程。因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故如果核心线程空闲,则交给核心线程处理;如果核心线程不空闲,则入列等待,直到核心线程空闲。
与CachedThreadPool的区别:
- 因为 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创建核心线程。 而CachedThreadPool因为corePoolSize=0,所以只会创建非核心线程。
- 在 getTask() 方法,如果队列里没有任务可取,线程会一直阻塞在 LinkedBlockingQueue.take() ,线程不会被回收。 CachedThreadPool会在60s后收回。
- 由于线程不会被回收,会一直卡在阻塞,所以没有任务的情况下, FixedThreadPool占用资源更多。
- 都几乎不会触发拒绝策略,但是原理不同。FixedThreadPool是因为阻塞队列可以很大(最大为Integer最大值),故几乎不会触发拒绝策略;CachedThreadPool是因为线程池很大(最大为Integer最大值),几乎不会导致线程数量大于最大线程数,故几乎不会触发拒绝策略。
# newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2
3
4
5
6
有且仅有一个核心线程( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),所以,不会创建非核心线程。所有任务按照先来先执行的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列里等待执行。
# newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize,
Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
2
3
4
5
6
7
8
9
10
11
# ForkJoinPool
ForkJoinPool是JDK7引入的线程池,核心思想是将大的任务拆分成多个小任务(即fork),然后在将多个小任务处理汇总到一个结果上(即join),非常像MapReduce处理原理。同时,它提供基本的线程池功能,支持设置最大并发线程数,支持任务排队,支持线程池停止,支持线程池使用情况监控,也是AbstractExecutorService的子类,主要引入了“工作窃取”机制,在多CPU计算机上处理性能更佳。
与其他ExecutorServices的情况一样,下表总结了三个主要任务执行方法。 这些设计主要由尚未在当前池中进行fork / join计算的客户端使用。 这些方法的主要形式接受的实例ForkJoinTask ,但重载形式也允许的纯混合执行Runnable -或Callable -基础的活动为好。 但是,通常情况下,在池中已经执行的任务会使用表中列出的计算内表单,除非使用不通常连接的异步事件式任务,否则在方法选择方面几乎没有区别。
work-stealing(工作窃取),ForkJoinPool提供了一个更有效的利用线程的机制,当ThreadPoolExecutor还在用单个队列存放任务时,ForkJoinPool已经分配了与线程数相等的队列,当有任务加入线程池时,会被平均分配到对应的队列上,各线程进行正常工作,当有线程提前完成时,会从队列的末端“窃取”其他线程未执行完的任务,当任务量特别大时,CPU多的计算机会表现出更好的性能。
公共池默认使用默认参数构建,但这些可以通过设置三个system properties来控制 :
java.util.concurrent.ForkJoinPool.common.parallelism- 并行级别,非负整数java.util.concurrent.ForkJoinPool.common.threadFactory- 类名java.util.concurrent.ForkJoinPool.common.exceptionHandler- 一个Thread.UncaughtExceptionHandler的类名
如果一个SecurityManager存在且没有指定工厂,则默认池使用一个工厂提供的线程不启用Permissions。 系统类加载器用于加载这些类。 建立这些设置有任何错误,使用默认参数。 通过将parallelism属性设置为零,和/或使用可能返回null的工厂,可以禁用或限制公共池中的线程的使用。 但是这样做可能导致未连接的任务永远不会被执行。
实现注意事项 :此实现将运行的最大线程数限制为32767.尝试创建大于最大数目的池导致IllegalArgumentException 。此实现仅在池关闭或内部资源耗尽时拒绝提交的任务(即抛出RejectedExecutionException)。
# newWorkStealingPool
WorkStealingPool 线程池通过 Executors 类的 ExecutorService newWorkStealingPool() 方法创建,其核心线程数为机器的核心数。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true);
}
2
3
4
5
6
WorkStealingPool 线程池采用 工作窃取 模式,相比于一般的线程池实现,工作窃取 模式的优势体现在对递归任务的处理方式上。
在一般的线程池中,若一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。 而在 工作窃取 模式中,若某个子问题由于等待另外一个子问题的完成而无法继续运行,则处理该子问题的线程会主动寻找其他尚未运行的子问题(窃取过来)来执行。这种方式减少了线程的等待时间,提高了性能。
public class WorkStealingPool {
public static void main(String[] args) throws IOException {
// CPU 核数
System.out.println(Runtime.getRuntime().availableProcessors());
// workStealingPool 会自动启动cpu核数个线程去执行任务
ExecutorService service = Executors.newWorkStealingPool();
service.execute(new R(1000)); // 我的cpu核数为4 启动5个线程,其中第一个是1s执行完毕,其余都是2s执行完毕,
// 有一个任务会进行等待,当第一个执行完毕后,会再次偷取第5个任务执行
for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
service.execute(new R(2000));
}
// 因为work stealing 是deamon线程,即后台线程,精灵线程,守护线程
// 所以当main方法结束时, 此方法虽然还在后台运行,但是无输出
// 可以通过对主线程阻塞解决
System.in.read();
}
static class R implements Runnable {
int time;
R(int time) {
this.time = time;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + time);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
六种常见的线程池基本够我们使用了,但是《阿里把把开发手册》不建议我们直接使用Executors类中的线程池,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学需要更加明确线程池的运行规则,规避资源耗尽的风险。
但如果你及团队本身对线程池非常熟悉,又确定业务规模不会大到资源耗尽的程度(比如线程数量或任务队列长度可能达到Integer.MAX_VALUE)时,其实是可以使用JDK提供的这几个接口的,它能让我们的代码具有更强的可读性。
参考资料:深入浅出Java多线程#线程池原理