死磕ThreadPoolExecutor线程池

Rothschil 2020-05-14 15:53:00
Java,ThreadPoolExecutor

1. 线程池的优势

线程空间大小

线程空间大小和具体JDK版本有很大关系,JDK8将近1.9M、JDK11差不多1.5M多。具体大小的查看可以执行命令java -XX:+UnlockDiagnosticVMOptions -XX:NativeMemoryTracking=summary -XX:+PrintNMTStatistics -version

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
C:\Users\WONGS> java -XX:+UnlockDiagnosticVMOptions -XX:NativeMemoryTracking=summary -XX:+PrintNMTStatistics -version
java version "11.0.2" 2019-01-15 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.2+9-LTS)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.2+9-LTS, mixed mode)

Native Memory Tracking:

Total: reserved=7849030KB, committed=465994KB
- Java Heap (reserved=6248448KB, committed=391168KB)
(mmap: reserved=6248448KB, committed=391168KB)

- Class (reserved=1056866KB, committed=4578KB)
(classes #472)
( instance classes #407, array classes #65)
(malloc=98KB #502)
(mmap: reserved=1056768KB, committed=4480KB)
( Metadata: )
( reserved=8192KB, committed=4096KB)
( used=3120KB)
( free=976KB)
( waste=0KB =0.00%)
( Class space:)
( reserved=1048576KB, committed=384KB)
( used=297KB)
( free=87KB)
( waste=0KB =0.00%)

- Thread (reserved=16455KB, committed=591KB)
(thread #16)
(stack: reserved=16384KB, committed=520KB)
(malloc=52KB #89)
(arena=19KB #30)
......

2. 几种常见线程池

构造函数

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

构造函数

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

构造函数

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

构造函数

1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

ScheduledThreadPoolExecutor类结构图

通过类图,我们分析,其实 ScheduledThreadPoolExecutorThreadPoolExecutor 子类。

综上所述,我们可以看到这些线程池底层实现都依靠 ThreadPoolExecutor 类的构造器,它是构造线程池的核心实现。但是现实在开发过程中避免利用 Executors 去创建线程池,这容易让人疑惑,JDK命名自带实现,为什么避免用,看完下一章节后,我们再谈这个话题。

3. 解析ThreadPoolExecutor

构造函数

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@NotNull TimeUnit unit,
@NotNull BlockingQueue<Runnable> workQueue,
@NotNull ThreadFactory threadFactory,
@NotNull RejectedExecutionHandler handler)

ThreadPoolExecutor结构类图

3.1. corePoolSize、maximumPoolSize

corePoolSize、maximumPoolSize 线程池中初始化的线程数量,初始化太多或者太少,都有可能造成资源的浪费,具体实际情况根据所需要处理的任务特征决定。

3.2. workQueue

将待处理的任务放入一个队列,这是一个阻塞队列,该队列可以是有界也可以是无界。

3.3. handler

其实上述四种策略都不够友好,在实际应用场景中,肯定要记录日志或者通过RPC框架触发通知补偿措施,否则会造成数据丢失或者处理过程不够严谨。一般情况下,我们需要自己实现 RejectedExecutionHandler 接口,在接口中记录日志或者持久化不能处理的任务信息。再通过定时任务,进行补偿重试。

3.4. 线程池执行顺序

线程池执行逻辑

下面将举几个例子。

3.4.1. 无界队列样例

定义核心线程数,corePoolSize 为 1;无界队列LinkedBlockingQueue同时为展示更好的效果,我让每个线程执行后都sleep 秒钟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ThreadPoolExecutorDemo2 {

public static ExecutorService getExecutorService(int coreSize,int maxSize,BlockingQueue queue){
RejectedExecutionHandler policy = new ThreadPoolExecutor.AbortPolicy();
return new ThreadPoolExecutor(coreSize, maxSize,0, TimeUnit.SECONDS, queue, policy);
}

public static void main(String[] args) {
// ArrayBlockingQueue queue = new ArrayBlockingQueue(3);
LinkedBlockingQueue queue = new LinkedBlockingQueue();
ExecutorService es = getExecutorService(1,5,queue);

ThreadTaskDemo t1 = new ThreadTaskDemo("t1");
ThreadTaskDemo t2 = new ThreadTaskDemo("t2");
ThreadTaskDemo t3 = new ThreadTaskDemo("t3");
ThreadTaskDemo t4 = new ThreadTaskDemo("t4");
ThreadTaskDemo t5 = new ThreadTaskDemo("t4");

es.execute(t1);
es.execute(t2);
es.execute(t3);
es.execute(t4);
es.execute(t5);

System.out.println("执行完毕!");
es.shutdown();
}
}

public class ThreadTaskDemo implements Runnable{

@Getter
@Setter
private String value;

public ThreadTaskDemo(){

}

public ThreadTaskDemo(String value){
this.value=value;
}

@Override
public void run() {
System.out.println("当前时间 "+LocalDateTime.now().getSecond()+" 当前线程名: "+Thread.currentThread().getName()+" BEGIN "+value );
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


通过控制台输出,我们可以看到,任务只在一个线程中有序执行,说明 maximumPoolSize 参数配置无意义,并未有创建线程的操作。

1
2
3
4
5
当前时间 35 当前线程名: pool-1-thread-1 BEGIN t1
当前时间 37 当前线程名: pool-1-thread-1 BEGIN t2
当前时间 39 当前线程名: pool-1-thread-1 BEGIN t3
当前时间 41 当前线程名: pool-1-thread-1 BEGIN t4
当前时间 43 当前线程名: pool-1-thread-1 BEGIN t5

演示效果

3.4.2. 有界队列样例

定义核心线程数,corePoolSize 为 1;有界队列ArrayBlockingQueue 设置 3、同时为展示更好的效果,我也让每个线程执行后都sleep 秒钟。

1
2
3
4
5
6

当前时间 30:14 当前线程名: pool-1-thread-2 BEGIN t5
当前时间 30:14 当前线程名: pool-1-thread-1 BEGIN t1
当前时间 30:16 当前线程名: pool-1-thread-1 BEGIN t2
当前时间 30:16 当前线程名: pool-1-thread-2 BEGIN t3
当前时间 30:18 当前线程名: pool-1-thread-2 BEGIN t4
1
2
3
4
5
6
7
8
9
10

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task xyz.wongs.interview.thread.pool.ThreadTaskDemo@4dfa3a9d rejected from java.util.concurrent.ThreadPoolExecutor@6eebc39e[Running, pool size = 1, active threads = 1, queued tasks = 3, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at xyz.wongs.interview.thread.pool.ThreadPoolExecutorDemo2.main(ThreadPoolExecutorDemo2.java:29)
当前时间 18:16 当前线程名: pool-1-thread-1 BEGIN t1
当前时间 18:18 当前线程名: pool-1-thread-1 BEGIN t2
当前时间 18:20 当前线程名: pool-1-thread-1 BEGIN t3
当前时间 18:22 当前线程名: pool-1-thread-1 BEGIN t4

有界队列执行顺序

综上所述,在 有界队列实现中我们要注意,任务数最大线程池容量队列容量三者之间的关系。

3.4.3. 自定义handler

编写一个 Java类,实现接口 RejectedExecutionHandler,重写 rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法,具体如下

1
2
3
4
5
6
7
8
9
10
11
12
public class CoustomRejectedExecutionHandler implements RejectedExecutionHandler {


@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof ThreadTaskDemo) {
ThreadTaskDemo thTk = (ThreadTaskDemo) r;
//为了演示用,所以直接打印,勿模仿。正式场景下应该持久化或者写入日志!!!
System.out.println("当前任务 "+ thTk.getValue()+" 执行失败!");
}
}
}

我们再运行下例子,我们可以发现并没抛出异常,而且控制应用也关闭。

自定义拒绝策略后验证效果

3.5. 禁用Executors创建线程池

通过上面我们简单了解线程池的构造函数参数的意义,我们线程池再线程创建时,其构造函数中指定的队列 LinkedBlockingQueue,这是一种无界的队列,最大值 Integer.MAX_VALUE 即214748364,这队列堆积数量过大,在实际生产中可能直接OOM,不信的话。好奇同学也会说不是还有 newCachedThreadPool,但是它的最大线程数量是 Integer.MAX_VALUE,道理一样,容易造成OOM。

所以很多大型公司在编码规范上都禁止利用 Executors创建线程池。

3.6. 第三方常见创建线程池的方式

3.6.1. 引入 commons-lang3 包方式【不推荐】

1
2
ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build();
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,threadFactory);

3.6.2. 引入 com.google.guava 包方式【一般推荐】

1
2
3
4
ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build();

ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());

4. 常见问题

4.1. newFixedThreadPool(1) 与 newSingleThreadExecutor 区别