池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
三大创建方法、七大参数、四种拒绝策略…

1、为什么要用线程池?

  • 降低资源消耗: 通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。

  • 提高响应速度: 任务到达时,无需等待线程创建即可立即执行。

  • 提高线程的可管理性: 线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。

  • 提供更多更强大的功能: 线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

线程池解决的问题是什么

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

  1. 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。

  2. 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。

  3. 系统无法合理管理内部的资源分布,会降低系统的稳定性。

为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。

Pooling is the grouping together of resources (assets, equipment, personnel, effort, etc.) for the purposes of maximizing advantage or minimizing risk to the users. The term is used in finance, computing and equipment management.——wikipedia

“池化”思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等领域也有相关的应用。

在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:

  1. 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。

  2. 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。

  3. 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。

在了解完“是什么”和“为什么”之后,下面我们来一起深入一下线程池的内部实现原理。

2、线程池的三大创建方法

通过 Executor 框架的工具类 Executors 来实现 我们可以创建三种类型的 ThreadPoolExecutor:

  • FixedThreadPool: 该方法返回一个固定线程数量的线程池。 该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: 方法返回一个只有一个线程的线程池。 若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool: 该方法返回一个可根据实际情况调整线程数量的线程池。 线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

3、实现 Runnable 接口和 Callable 接口的区别

Runnable自 Java 1.0 以来一直存在,但Callable仅在 Java 1.5 中引入,目的就是为了来处理Runnable不支持的用例。Runnable 接口不会返回结果或抛出检查异常,但是Callable 接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口, 这样代码看起来会更加简洁。

工具类 Executors 可以实现 Runnable 对象和 Callable 对象之间的相互转换。(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。

Runnable.java

1
2
3
4
5
6
7
@FunctionalInterface
public interface Runnable {
/**
* 被线程执行,没有返回值也无法抛出异常
*/
public abstract void run();
}

Callable.java

1
2
3
4
5
6
7
8
9
@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}

4、执行 execute()方法和 submit()方法的区别是什么呢?

  1. execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
  2. submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功, 并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

5、ThreadPoolExecutor 类分析

ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

5.1 ThreadPoolExecutor构造函数重要参数分析

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数:

  • keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  • unit : keepAliveTime 参数的时间单位。
  • threadFactory :executor 创建新线程的时候会用到。
  • handler :饱和策略。关于饱和策略下面单独介绍一下。

5.2 ThreadPoolExecutor 饱和策略(4种拒绝策略)

ThreadPoolExecutor 饱和策略定义:

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。

举个例子: Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

ThreadPoolExecutorDemo.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {

//使用阿里巴巴推荐的创建线程池的方式
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());

for (int i = 0; i < 10; i++) {
//创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
Runnable worker = new MyRunnable("" + i);
//执行Runnable
executor.execute(worker);
}
//终止线程池
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}

可以看到我们上面的代码指定了:

  1. corePoolSize: 核心线程数为 5。
  2. maximumPoolSize :最大线程数 10
  3. keepAliveTime : 等待时间为 1L。
  4. unit: 等待时间的单位为 TimeUnit.SECONDS。
  5. workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
  6. handler:饱和策略为 CallerRunsPolicy。

6、线程池原理分析

为了搞懂线程池的原理,我们需要首先分析一下 execute方法。
看看它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c) {
return c & CAPACITY;
}

private final BlockingQueue<Runnable> workQueue;

public void execute(Runnable command) {
// 如果任务为null,则抛出异常。
if (command == null)
throw new NullPointerException();
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();

// 下面会涉及到 3 步 操作
// 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池为空就新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
}

图解:
20210318164037

7、关闭线程池

Java提供的对ExecutorService的关闭方式有两种,一种是调用其shutdown()方法,另一种是调用shutdownNow()方法。这两者是有区别的。

以下内容摘自源代码内的注释

1
2
3
4
// shutdown()
Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
Invocation has no additional effect if already shut down.
This method does not wait for previously submitted tasks to complete execution. Use awaitTermination to do that.
1
2
3
4
// shutdownNow()
Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.
This method does not wait for actively executing tasks to terminate. Use awaitTermination to do that.
There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via interrupt, so any task that fails to respond to interrupts may never terminate.

shutdown()

1、调用之后不允许继续往线程池内继续添加线程;
2、线程池的状态变为SHUTDOWN状态;
3、所有在调用shutdown()方法之前提交到ExecutorSrvice的任务都会执行;
4、一旦所有线程结束执行当前任务,ExecutorService才会真正关闭。

shutdownNow()

1、该方法返回尚未执行的 task 的 List;
2、线程池的状态变为STOP状态;
3、阻止所有正在等待启动的任务, 并且停止当前正在执行的任务。

简单点来说,就是:

shutdown()调用后,不可以再 submit 新的 task,已经 submit 的将继续执行
shutdownNow()调用后,试图停止当前正在执行的 task,并返回尚未执行的 task 的 list

源码分析

这里用的是JDK1.8,首先进入ThreadPoolExecutorshutDown()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}

private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}

private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

void onShutdown() {}

final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

进入shutDownNow()方法看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}

private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}

private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}

final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

实战

1、Demo1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.concurrent.executorService;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author riemann
* @date 2019/07/28 23:41
*/
public class ExecutorServiceDemo1 {

static Runnable run = () -> {
try {
Thread.sleep(5000);
System.out.println("thread finish");
} catch (InterruptedException e) {
e.printStackTrace();
}
};

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(2);
service.execute(run);
service.shutdown();
service.execute(run);
}

}

输出结果:

1
2
3
4
5
6
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.concurrent.executorService.ExecutorServiceDemo1$$Lambda$1/1854731462@312b1dae rejected from java.util.concurrent.ThreadPoolExecutor@7530d0a[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.concurrent.executorService.ExecutorServiceDemo1.main(ExecutorServiceDemo1.java:25)
thread finish

当调用shutdown()之后,将不能继续添加任务,否则会抛出异常RejectedExecutionException。并且当正在执行的任务结束之后才会真正结束线程池。

2、Demo2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.concurrent.executorService;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author riemann
* @date 2019/07/29 0:03
*/
public class ExecutorServiceDemo2 {

static Runnable run = () -> {
try {
Thread.sleep(5000);
System.out.println("thread finish");
} catch (InterruptedException e) {
e.printStackTrace();
}
};

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(2);
service.execute(run);
service.shutdownNow();
}

}

输出结果:

1
2
3
4
5
6
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.concurrent.executorService.ExecutorServiceDemo2.lambda$static$0(ExecutorServiceDemo2.java:14)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

使用shutdownNow(),若线程中有执行sleep/wait/定时锁等,直接终止正在运行的线程并抛出 interrupt 异常。因为其内部是通过Thread.interrupt()实现的。
但是这种方法有很强的局限性。因为如果线程中没有执行sleep等方法的话,其无法终止线程。

3、Demo3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.concurrent.executorService;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author riemann
* @date 2019/07/29 0:05
*/
public class ExecutorServiceDemo3 {

static Runnable run = () -> {
long num = 0;
boolean flag = true;
while (flag) {
num += 1;
if (num == Long.MAX_VALUE) {
flag = false;
}
}
};

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(1);
service.execute(run);
service.shutdownNow();
}

}

很多代码中都会有这样的情况,比方说使用循环标记flag循环执行一些耗时长的计算任务, 直到满足某个条件之后才设置循环标记为false
如 Demo3 代码所示 (循环等待的情况),shutdownNow()无法终止线程。
如果遇到这种情况,可以使用如 Demo4 中的方法。

4、Demo4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.concurrent.executorService;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author riemann
* @date 2019/07/29 0:12
*/
public class ExecutorServiceDemo4 {

static Runnable run = () -> {
long num = 0;
while (true && !Thread.currentThread().isInterrupted()) {
num += 1;
}
System.out.println(num);
};

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(1);
service.execute(run);
service.shutdownNow();
}

}

输出结果:

1
0

对于循环等待的情况,可以引入变量Thread.currentThread().isInterrupted()来作为其中的一个判断条件。
isInterrupted()方法返回当前线程是否有被 interrupt。
shutdownNow()的内部实现实际上就是通过 interrupt 来终止线程,所以当调用shutdownNow()时,isInterrupted()会返回true
此时就可以跳出循环等待。
然而这也不是最优雅的解决方式,具体可以参见 Demo5。

5、Demo5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.concurrent.executorService;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @author riemann
* @date 2019/07/29 0:17
*/
public class ExecutorServiceDemo5 {

static Runnable run = () -> {
long num = 0;
boolean flag = true;
while (flag && !Thread.currentThread().isInterrupted()) {
num += 1;
if (num == Long.MAX_VALUE) {
flag = false;
}
}
System.out.println(num);
};

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(1);
service.execute(run);
service.shutdown();
try {
if (!service.awaitTermination(2, TimeUnit.SECONDS)) {
service.shutdownNow();
}
} catch (InterruptedException e) {
service.shutdownNow();
}
}

}

输出结果:

1
999032162

这里。先调用shutdown()使线程池状态改变为SHUTDOWN,线程池不允许继续添加线程,并且等待正在执行的线程返回。
调用awaitTermination设置定时任务,代码内的意思为 2s 后检测线程池内的线程是否均执行完毕(就像老师告诉学生,“最后给你 2s 钟时间把作业写完”),若没有执行完毕,则调用shutdownNow()方法。

一些问题

线程池被创建后里面有线程吗?如果没有的话,你知道有什么方法对线程池进行预热吗?

线程池被创建后如果没有任务过来,里面是不会有线程的。如果需要预热的话可以调用下面的两个方法:

启动一个

20210518144636

全部启动

20210518144659

核心线程数会被回收吗?需要什么设置?

核心线程数默认是不会被回收的,如果需要回收核心线程数,需要调用下面的方法:

20210518144740

allowCoreThreadTimeOut 该值默认为 false。

20210518144801

小结

线程池最大线程数到底该如何定义

  1. CPU 密集型:电脑是几核,就是几,可以保持CPU的效率最高。
    1
    2
    // 获取CPU核心数
    Runtime.getRuntime().availableProcessors()
  2. IO 密集型:判断程序中十分耗IO的线程有多少个,大于这个数(或者2倍)

参考

线程池

线程池的关闭方式有几种,各自的区别是什么。

好文推荐:Java线程池实现原理及其在美团业务中的实践

评论