Java 线程池艺术探索

线程池

Wiki 上是这样解释的:Thread Pool

作用:利用线程池可以大大减少在创建和销毁线程上所花的时间以及系统资源的开销!

下面主要讲下线程池中最重要的一个类 ThreadPoolExecutor 。

ThreadPoolExecutor

ThreadPoolExecutor1

ThreadPoolExecutor 构造器:

有四个构造器的,挑了参数最长的一个进行讲解。

constructor

七个参数:

  • corePoolSize:核心池的大小,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止;
  • unit:参数keepAliveTime的时间单位(DAYS、HOURS、MINUTES、SECONDS 等);
  • workQueue:阻塞队列,用来存储等待执行的任务;
    • ArrayBlockingQueue (有界队列)
    • LinkedBlockingQueue (无界队列)
    • SynchronousQueue
  • threadFactory:线程工厂,主要用来创建线程
  • handler:拒绝处理任务的策略

    • AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常。(默认这种)
    • DiscardPolicy:也是丢弃任务,但是不抛出异常
    • DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    • CallerRunsPolicy:由调用线程处理该任务

    Rejected-method-Thread-Pool

重要方法:

  • execute():通过这个方法可以向线程池提交一个任务,交由线程池去执行;
  • shutdown():关闭线程池;

execute() 方法:

注:JDK 1.7 和 1.8 这个方法有点区别,下面代码是 1.8 中的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1、如果当前的线程数小于核心线程池的大小,根据现有的线程作为第一个 Worker 运行的线程,新建一个 Worker,addWorker 自动的检查当前线程池的状态和 Worker 的数量,防止线程池在不能添加线程的状态下添加线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2、如果线程入队成功,然后还是要进行 double-check 的,因为线程在入队之后状态是可能会发生变化的
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// recheck 防止线程池状态的突变,如果突变,那么将 reject 线程,防止 workQueue 中增加新线程
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)//上下两个操作都有 addWorker 的操作,但是如果在workQueue.offer 的时候 Worker 变为 0,那么将没有 Worker 执行新的 task,所以增加一个 Worker.
addWorker(null, false);
}
//3、如果 task 不能入队(队列满了),这时候尝试增加一个新线程,如果增加失败那么当前的线程池状态变化了或者线程池已经满了然后拒绝task
else if (!addWorker(command, false))
reject(command);
}

其中调用了 addWorker() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
private boolean addWorker(Runnable firstTask, boolean core) {// firstTask: 新增一个线程并执行这个任务,可空,增加的线程从队列获取任务;core:是否使用 corePoolSize 作为上限,否则使用 maxmunPoolSize
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**
* rs!=Shutdown || fistTask!=null || workQueue.isEmpty
* 如果当前的线程池的状态 > SHUTDOWN 那么拒绝 Worker 的 add 如果 =SHUTDOWN
* 那么此时不能新加入不为 null 的 Task,如果在 workQueue 为 empty 的时候不能加入任何类型的 Worker,
* 如果不为 empty 可以加入 task 为 null 的 Worker, 增加消费的 Worker
*/
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//如果当前的数量超过了 CAPACITY,或者超过了 corePoolSize 和 maximumPoolSize(试 core 而定)
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS 尝试增加线程数,如果失败,证明有竞争,那么重新到 retry。
if (compareAndIncrementWorkerCount(c))// AtomicInteger 的 CAS 操作;
break retry;
c = ctl.get(); // Re-read ctl
//判断当前线程池的运行状态,状态发生改变,重试 retry;
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);// Worker 为内部类,封装了线程和任务,通过 ThreadFactory 创建线程,可能失败抛异常或者返回 null
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
// SHUTDOWN 以后的状态和 SHUTDOWN 状态下 firstTask 为 null,不可新增线程
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;//记录最大线程数
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);//失败回退,从 wokers 移除 w, 线程数减一,尝试结束线程池(调用tryTerminate 方法)
}
return workerStarted;
}

示意图:

coreThreadPool

执行流程:

1、当有任务进入时,线程池创建线程去执行任务,直到核心线程数满为止

2、核心线程数量满了之后,任务就会进入一个缓冲的任务队列中

  • 当任务队列为无界队列时,任务就会一直放入缓冲的任务队列中,不会和最大线程数量进行比较
  • 当任务队列为有界队列时,任务先放入缓冲的任务队列中,当任务队列满了之后,才会将任务放入线程池,此时会与线程池中最大的线程数量进行比较,如果超出了,则默认会抛出异常。然后线程池才会执行任务,当任务执行完,又会将缓冲队列中的任务放入线程池中,然后重复此操作。

shutdown() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//判断是否可以操作目标线程
checkShutdownAccess();
//设置线程池状态为 SHUTDOWN, 此处之后,线程池中不会增加新 Task
advanceRunState(SHUTDOWN);
//中断所有的空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//转到 Terminate
tryTerminate();
}

参考资料:深入理解java线程池—ThreadPoolExecutor

JDK 自带四种线程池分析与比较

four-Thread-Pool

1、newFixedThreadPool

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

newFixedThreadPool

2、newSingleThreadExecutor

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。newSingleThreadExecutor

3、newCachedThreadPool

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

newCachedThreadPool

4、newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。

newScheduledThreadPool

四种线程池其实内部方法都是调用的 ThreadPoolExecutor 类,只不过利用了其不同的构造器方法而已(传入自己需要传入的参数),那么利用这个特性,我们自己也是可以实现自己定义的线程池的。

自定义线程池

1、创建任务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.zhisheng.thread.threadpool.demo;
/**
* Created by 10412 on 2017/7/24.
* 任务
*/
public class MyTask implements Runnable
{
private int taskId; //任务 id
private String taskName; //任务名字
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}

public MyTask(int taskId, String taskName) {
this.taskId = taskId;
this.taskName = taskName;
}
@Override
public void run() {
System.out.println("当前正在执行 ****** 线程Id-->" + taskId + ",任务名称-->" + taskName);
try {
Thread.currentThread().sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程Id-->" + taskId + ",任务名称-->" + taskName + " ----------- 执行完毕!");
}
}

2、自定义拒绝策略,实现 RejectedExecutionHandler 接口,重写 rejectedExecution 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.zhisheng.thread.threadpool.demo;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Created by 10412 on 2017/7/24.
* 自定义拒绝策略,实现 RejectedExecutionHandler 接口
*/
public class RejectedThreadPoolHandler implements RejectedExecutionHandler
{
public RejectedThreadPoolHandler() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("WARNING 自定义拒绝策略: Task " + r.toString() + " rejected from " + executor.toString());
}
}

3、创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.zhisheng.thread.threadpool.demo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Created by 10412 on 2017/7/24.
*/
public class ThreadPool
{
public static void main(String[] args) {
//核心线程数量为 2,最大线程数量 4,空闲线程存活的时间 60s,有界队列长度为 3,
//ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));

//核心线程数量为 2,最大线程数量 4,空闲线程存活的时间 60s, 无界队列,
//ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>());

//核心线程数量为 2,最大线程数量 4,空闲线程存活的时间 60s,有界队列长度为 3, 使用自定义拒绝策略
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(3), new RejectedThreadPoolHandler());

for (int i = 1; i <= 10; i++) {
//创建 10 个任务
MyTask task = new MyTask(i, "任务" + i);
//运行
pool.execute(task);
System.out.println("活跃的线程数:"+pool.getActiveCount() + ",核心线程数:" + pool.getCorePoolSize() + ",线程池大小:" + pool.getPoolSize() + ",队列的大小" + pool.getQueue().size());
}

//关闭线程池
pool.shutdown();
}
}

这里运行结果就不截图了,我在本地测试了代码是没问题的,感兴趣的建议还是自己跑一下,然后分析下结果是不是和前面分析的一样,如有问题,请在我博客下面评论!

总结

本文一开始讲了线程池的介绍和好处,然后分析了线程池中最核心的 ThreadPoolExecutor 类中构造器的七个参数的作用、类中两个重要的方法,然后在对比研究了下 JDK 中自带的四种线程池的用法和内部代码细节,最后写了一个自定义的线程池。

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 线程池
  2. 2. ThreadPoolExecutor
  3. 3. JDK 自带四种线程池分析与比较
    1. 3.1. 1、newFixedThreadPool
    2. 3.2. 2、newSingleThreadExecutor
    3. 3.3. 3、newCachedThreadPool
    4. 3.4. 4、newScheduledThreadPool
  4. 4. 自定义线程池
  5. 5. 总结