Java多线程ThreadPoolExecutor初探

在java中,使用线程时通过new Thread实现很简单,但是如果并发数量很多时,频繁地创建线程就会大大降低系统的效率。

所以可以通过线程池,使得线程可以复用,每执行完一个任务,并不是被销毁,而是可以继续执行其他任务。

花了两天时间去看了高洪岩写的《JAVA并发编程》,是想要知其然,知其所以然,在使用的情况下,了解学习了一下原理记录下java.util.concurrent并发包下的ThreadPoolExecutor特性和实现


使用示例

粗暴点,我们直接看如何使用吧

(一)使用Executors

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
简单举个🌰:
Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池
Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池
具体实现逻辑:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

通过该Executors的静态方法进行线程池的创建,而且从具体实现来看,还是调用了new ThreadPoolExecutor(),只是内部参数已经帮我们配置好了。

(二) 使用ThreadPoolExecutor

既然真正实现都是用ThreadPoolExecutor,那就自己设定好方法的参数吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.HOURS, new LinkedBlockingDeque<>());

for(int i=0;i<10;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
executor.getQueue().size()+",已执行完别的任务数目:"+executor.getCompletedTaskCount());
}
executor.shutdown();

}

static class MyTask implements Runnable {
private int taskNum;

public MyTask(int num) {
this.taskNum = num;
}

@Override
public void run() {
System.out.println("正在执行task "+taskNum);
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"执行完毕");
}
}

打印效果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行完别的任务数目:0
task 2执行完毕
task 0执行完毕
task 3执行完毕
task 1执行完毕
正在执行task 8
task 4执行完毕
正在执行task 7
正在执行task 6
正在执行task 5
正在执行task 9
task 8执行完毕
task 6执行完毕
task 7执行完毕
task 5执行完毕
task 9执行完毕

任务Task提交之后,由于是多线程状态下,所以打印效果并不是同步的,可以看出任务都已经顺利执行。

我这个实现参数是5个corePoolSize核心线程数和5个maximumPoolSize最大线程数,当线程池中的线程数超过5个的时候,将新来的任务放进缓存队列中,小伙伴可以试下把任务数(for循环的个数)提高一点,让缓存等待的任务数超过5个,看看默认的任务拒绝策略(AbortPolicy)会抛出什么错误hhh

下面来看看ThreadPoolExecutor的庐山真面目吧~


ThreadPoolExecutor

它有以下四个构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

从构造方法可以看出,前三个方法最终都是调用第四个构造器进行初始化工作的。

参数解释:

  • corePoolSize:池中保持的线程数,包括空闲的线程,也就是核心池的大小
  • maximumPoolSize:池中锁允许最大线程数
  • keepAliveTime:当线程数量超过corePoolSize,在没有超过指定的时间内不从线程池中删除,如果超过该时间,则删除
  • unit:keepAliveTime的时间单位
  • workQueue:执行前用来保存任务的队列,此队列只保存由execute方法提交的Runnable任务

workQueue(任务队列,是一个阻塞队列)

ArrayBlockingQueue:
1
2
3
4
5
6
7
8
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
LinkedBlockingDeque:(支持列头和列尾操作,pollFirst/pollLast)
1
2
3
4
5
6
7
8
9
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}


public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}

从源码构造函数可以看到,不传参数的时候,默认阻塞队列中的大小是Integer.MAX_VALUE;

SynchronousQueue:
1
2
3
4
5
6
7
public SynchronousQueue() {
this(false);
}

public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

Array和Linked在传入大小小于0时将会报错,比较常用的是LinkedBlockingDeque和SynchronousQueue,线程池的排队策略与BlockingQueue有关

ThreadFactory:线程工厂

主要用来创建线程,可以在newThread()方法中自定义线程名字和设置线程异常情况的处理逻辑。

举个🌰:

1
2
3
4
5
6
7
8
9
10
11
12
static class MyThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread();
thread.setName("JingQ" + new Date());
thread.setUncaughtExceptionHandler((t, e) -> {
doSomething();
e.printStackTrace();
});
return thread;
}
}

handler:拒绝策略

有以下四种:

  • ThreadPoolExecutor.AbortPolicy:当任务添加到线程中被拒绝时,它会抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.DiscardPolicy:任务被拒绝时,线程池丢弃被拒绝的任务
  • ThreadPoolExecutor.DiscardOldestPolicy:任务被拒绝时,线程池会放弃等待队列中最旧的未处理文物,然后将被拒绝的任务添加到等待队列中
  • ThreadPoolExecutor.CallerRunsPolicy:任务被拒绝时,会使用调用线程池的Thread线程对象处理被拒绝的任务

ThreadPoolExecutor继承结构

可以看出,实际上ThreadPoolExecutor是继承了AbstractExecutorService类和引用了ExecutorService、Executor接口。

AbstractExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public abstract class AbstractExecutorService implements ExecutorService {

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}

AbstarctExecutorService是一个抽象类,它实现的是ExecutorService接口

ExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

接口ExecutorService引用了Executor接口,Executor接口比较简单,只有一个execute方法定义

Executor

1
2
3
public interface Executor {
void execute(Runnable command);
}

小结:

Executor是一个顶级接口,定义了一个execute方法,返回值为空,参数为Runnable。

ExecutorService继承了Executor并且定义了其它一些方法,结果如下图:

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法。

最后ThreadPoolExecutor继承了AbstractExecutorService,我们最常用到它两个方法,submit和execute,下面介绍一下这两者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
execute():
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* (以下是个人渣翻译,有误请轻喷~)
* 有以下三步流程:
*
* 1. 如果少于核心池大小的线程正在运行,
* 那么尝试以给定的命令作为它的第一个任务启动一个新线程。
* 调用添加worker原子性检查运行状态和workder的数量,
* 这样可以防止错误警报在不应该返回的情况下添加线程,返回false。
*
* 2. 如果一个任务可以成功地排队,那么我们仍然需要再次检查是否应该添加一个线程
* (因为现有的线程在上次检查后死亡),或者是在该方法进入后关闭了池。
* 因此,我们重新检查状态,如果必要的话,如果停止的话,需要回滚队列。
* 如果没有新的线程,就去启动它
*
* 3. 如果我们不能排队任务,那么我们尝试添加一个新线程。
* 如果失败了,我们知道任务队列已经被关闭或饱和,所以拒绝这个任务。
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
submit:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}


public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

小结

execute()方法在ThreadPoolExecutor中进行了重写,submit()方法是在AbstractExecutorService实现的,ThreadPoolExecutor并没有重写,并且execute方法是没有返回结果的,submit的返回类型是Future,能够获得任务的结果,但是实际执行的还是execute方法。

当然,还有例如shutdown、getQueue、getActiveCount、getPoolSize等方法没有介绍到,推荐胖友们打开IDE进行查看吧~

ps:关于线程池的原理并未深入记录,有关它的任务拒绝策略、线程初始化、ThreadPoolExecutor构造之后,当任务超过设定值,它的执行策略等原理都值得去深入学习,下回记录~