总结一下 Java 线程池的使用

总结一下 Java 线程池的使用。

总览

  1. Executor 位于最顶层,也是最简单的,就一个 execute(Runnable runnable) 接口方法定义。
  2. ExecutorService 也是接口,在 Executor 接口的基础上添加了很多的接口方法,所以一般来说我们会使用这个接口
  3. 然后再下来一层是 AbstractExecutorService,从名字我们就知道,这是抽象类,这里实现了非常有用的一些方法供子类直接使用。
  4. 然后才到我们的重点部分 ThreadPoolExecutor 类,这个类提供了关于线程池所需的非常丰富的功能。
  5. 同在并发包中的 Executors 类,类名中带字母 s,我们猜到这个是工具类,里面的方法都是静态方法,如以下我们最常用的用于生成 ThreadPoolExecutor 的实例的一些方法。
  6. 另外,由于线程池支持获取线程执行的结果,所以,引入了 Future 接口,RunnableFuture 继承自此接口,然后我们最需要关心的就是它的实现类 FutureTask。到这里,记住这个概念,在线程池的使用过程中,我们是往线程池提交任务(task),使用过线程池的都知道,我们提交的每个任务是实现了 Runnable 接口的,其实就是先将 Runnable 的任务包装成 FutureTask,然后再提交到线程池。
  7. 线程池中的 BlockingQueue 也是非常重要的概念,如果线程数达到 corePoolSize,我们的每个任务会提交到等待队列中,等待线程池中的线程来取任务并执行。这里的 BlockingQueue 通常我们使用其实现类 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每个实现类都有不同的特征,使用场景之后会慢慢分析。

Executor

1
2
3
4
5
6
7
/* 
* @since 1.5
* @author Doug Lea
*/
public interface Executor {
void execute(Runnable command);
}

Executor 接口非常简单,就一个 void execute(Runnable command) 方法,代表提交一个任务。Executor 这个接口只有提交任务的功能,太简单了,我们想要更丰富的功能,比如我们想知道执行结果、我们想知道当前线程池有多少个线程活着、已经完成了多少任务等等,这些都是这个接口的不足的地方。接下来我们要介绍的是继承自 Executor 接口的 ExecutorService 接口,这个接口提供了比较丰富的功能,也是我们最常使用到的接口。

ExecutorService

一般我们定义一个线程池的时候,往往都是使用这个接口:

1
2
ExecutorService executor = Executors.newFixedThreadPool(args...);
ExecutorService executor = Executors.newCachedThreadPool(args...);

们简单初略地来看一下这个接口中都有哪些方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public interface ExecutorService extends Executor {

// 关闭线程池,已提交的任务继续执行,不接受继续提交新任务
void shutdown();

// 它和上面的方法相比,区别在于它会去停止当前正在进行的任务
List<Runnable> shutdownNow();

// 判断线程池是否已关闭
boolean isShutdown();

// 如果调用了 shutdown 或 shutdownNow 方法后,所有任务结束了那么返回 true
boolean isTerminated();

// 等待所有任务完成并设置超时时间,先调用 shutdown 或 shutdownNow
// 然后再调这个方法等待所有的线程真正地完成,返回值意味着有没有超时
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

// 提交一个 Callable 任务
<T> Future<T> submit(Callable<T> task);

// 提交一个 Runnable 任务,第二个参数将会放到 Future 中,作为返回值,
// 因为 Runnable 的 run 方法本身并不返回任何东西
<T> Future<T> submit(Runnable task, T result);

// 提交一个 Runnable 任务
Future<?> submit(Runnable task);

// 执行所有任务,返回 Future 类型的一个 list
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

// 也是执行所有任务,但是这里设置了超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

// 只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

// 同上一个方法,只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果,
// 不过这个带超时,超过指定的时间,抛出 TimeoutException 异常
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask

Runnable 的 void run() 方法是没有返回值的,通如果我们需要的话,会在 submit 中指定第二个参数作为返回值:

1
<T> Future<T> submit(Runnable task, T result);

其实到时候会通过这两个参数,将其包装成 Callable。它和 Runnable 的区别在于 run() 没有返回值,而 Callable 的 call() 方法有返回值,同时,如果运行出现异常,call() 方法会抛出异常。

1
2
3
4
public interface Callable<V> {

V call() throws Exception;
}

JDK 提供的 Future 不支持回调,如果有支持 callback 的场景,可以借助 Guava 来的 MoreExecutors 工具类来解决,可以将 JDK 的 ExecutorService 转换成 ListeningExecutorService,Guava 的 ListenableFuture 能够很好的支持回调。

AbstractExecutorService

AbstractExecutorService 抽象类派生自 ExecutorService 接口,然后在其基础上实现了几个实用的方法,这些方法提供给子类进行调用。

这个抽象类实现了 invokeAny 方法和 invokeAll 方法,这里的两个 newTaskFor 方法也比较有用,用于将任务包装成 FutureTask。定义于最上层接口 Executor中的 void execute(Runnable command) 由于不需要获取结果,不会进行 FutureTask 的包装。需要获取结果(FutureTask),用 submit 方法,不需要获取结果,可以用 execute 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public abstract class AbstractExecutorService implements ExecutorService {
// RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask
// 下面两个 newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

// 提交任务
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 2. 交给执行器执行,execute 方法由具体的子类来实现
// 前面也说了,FutureTask 间接实现了Runnable 接口。
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
// 2. 交给执行器执行
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 2. 交给执行器执行
execute(ftask);
return ftask;
}

// invoke 省去
}

这个抽象类包装了一些基本的方法,可是像 submit、invokeAny、invokeAll 等方法,它们都没有真正开启线程来执行任务,它们都只是在方法内部调用了 execute 方法,所以最重要的 execute(Runnable runnable) 方法还没出现,需要等具体执行器来实现这个最重要的部分,这里我们要说的就是 ThreadPoolExecutor 类了。

ThreadPoolExecutor

ThreadPoolExecutor 是 JDK 中的线程池实现,这个类实现了一个线程池需要的各个方法,它实现了任务提交、线程管理、监控等等方法。我们可以基于它来进行业务上的扩展,以实现我们需要的其他功能,比如实现定时任务的类 ScheduledThreadPoolExecutor 就继承自 ThreadPoolExecutor。

我们经常会使用 Executors 这个工具类来快速构造一个线程池,对于初学者而言,这种工具类是很有用的,开发者不需要关注太多的细节,只要知道自己需要一个线程池,仅仅提供必需的参数就可以了,其他参数都采用作者提供的默认值。

1
2
3
4
5
6
7
8
9
10
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

这里先不说有什么区别,它们最终都会导向这个构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 这几个参数都是必须要有的
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();

this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  1. corePoolSize,核心线程数。
  2. maximumPoolSize,最大线程数,线程池允许创建的最大线程数。
  3. workQueue,任务队列,BlockingQueue 接口的某个实现。
  4. keepAliveTime,空闲线程的保活时间,如果某线程的空闲时间超过这个值都没有任务给它做,那么可以被关闭了。注意这个值并不会对所有线程起作用,如果线程池中的线程数少于等于核心线程数 corePoolSize,那么这些线程不会因为空闲太长时间而被关闭,当然,也可以通过调用 allowCoreThreadTimeOut(true)使核心线程数内的线程也可以被回收。
  5. threadFactory,用于生成线程,一般我们可以用默认的就可以了。通常,我们可以通过它将我们的线程的名字设置得比较可读一些。
  6. handler,当线程池已经满了,但是又有新的任务提交的时候,该采取什么策略由这个来指定。有几种方式可供选择,像抛出异常、直接拒绝然后返回等,也可以自己实现相应的接口实现自己的逻辑。

状态变化

介绍下线程池中的各个状态和状态变化的转换过程:

  • RUNNING:这是最正常的状态:接受新的任务,处理等待队列中的任务。
  • SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务。
  • STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程。
  • TIDYING:所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()。
  • TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个。

    Doug Lea 把线程池中的线程包装成了一个个 Worker,任务是 Runnable(内部变量名叫 task 或 command),线程是 Worker

Executors

生成一个固定大小的线程池:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0,任务队列采用 LinkedBlockingQueue,无界队列。刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中。

生成只有一个线程的固定线程池,这个更简单,和上面的一样,只要设置线程数为 1 就可以了:

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

生成一个需要的时候就创建新的线程,同时可以复用之前创建的线程(如果这个线程当前没有任务)的线程池:

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue。这种线程池对于任务可以比较快速地完成的情况有比较好的性能。如果线程空闲了 60 秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有的线程都会被关闭,整个线程池不会占用任何的系统资源。

ThreadFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

BlockingQueue

BlockingQueue 是一个先进先出的队列(Queue),为什么说是阻塞(Blocking)的呢?是因为 BlockingQueue 支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。

BlockingQueue 对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用:1、抛出异常;2、返回特殊值(null 或 true/false,取决于具体的操作);3、阻塞等待此操作,直到这个操作成功;4、阻塞等待此操作,直到成功或者超时指定时间。总结如下:

Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

JDK 中对于 BlockingQueue 有多种实现,其中最常用的是 ArrayBlockingQueue 以及 LinkedBlockingQueue,以 ArrayBlockingQueue 为例,我们来仔细看看它的实现。

ArrayBlockingQueue

ArrayBlockingQueue 中的主要属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 存储元素的数组,是个循环数组
final Object[] items;

// 下一次读数据的时候的索引
int takeIndex;

// 下一次写数据的时候的索引
int putIndex;

// 队列中已经存储元素的个数
int count;

// 锁
final ReentrantLock lock;

// 等待读数据的的条件对象
private final Condition notEmpty;

//等待写数据的的条件对象
private final Condition notFull;

首先看一下 add 的实现,add 是直接依赖于 offer:

1
2
3
4
5
6
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

offer 代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

比较简单,写数据之前会先拿到锁,如果满了就返回 false,返回 false 上面的 add 方法就会抛异常。解下来在看一下 put 方法:

1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

lockInterruptibly 方法,与 lock 方法相比,lockInterruptibly 可以被中断,中断的时候产生 InterruptedException 异常。如果已经满了则执行 wait 逻辑等待,这里就是阻塞队列的核心。offer 与 put 都用到了 enqueue 方法向容器中加入元素,接下来我们看下 enqueue 方法:

1
2
3
4
5
6
7
8
9
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}

如果达到数组长度则将 putIndex 置为 0,相当于下一次放入元素位置为数组的第一次位置,从头开始放入元素,上面说过 items 是个循环数组,就是在这里体现出来的,如果我们初始化的时候设定容器 items 大小为10,然而我们不停放入数据也是没问题的,只不过后面加入的数据会覆盖前面的数据。最后会唤醒读数据的线程。

接下来分析读数据的方法,读数据主要是 poll与 take 方法,先来看下 pool 非常简单:

1
2
3
4
5
6
7
8
9
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

take 也不难理解,如果 count 为 0 也就是容器内没有数据则执行 wait 方法,线程一直处于等待状态,如果不为 0 则执行 dequeue。

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

接下来看一下 deque 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}

总结起来就是从数组 items 中取出数据,并将原数组位置处置为 null。读数据索引 takeIndex 加 1 后与数组总长度比较如果达到数组长度则将 takeIndex 置为 0,下一次从数组开始处取数据。最后会通知放入数据的线程有数据取出了。

LinkedBlockingQueue 源码就不一一分析了,主要区别是 LinkedBlockingQueue 内部是用链表来存储数据的,并且有两把锁,放数据锁与取数据锁,也就是放入数据的线程和取出数据的线程可以同时操作 LinkedBlockingQueue,而ArrayBlockingQueue 中放数据线程与取数据线程是互斥的,不能同时操作,LinkedBlockingQueue 初始化的时候可以不指定容器大小,如果不指定则容器大小为 Integer.MAX_VALUE,而 ArrayBlockingQueue 则必须指定容器大小。

拒绝策略

接口 RejectedExecutionHandler 提供了拒绝任务处理的自定义方法的机会。在 ThreadPoolExecutor 中已经包含四种拒绝策略。

  1. AbortPolicy,抛出运行时异常 RejectedExecutionException,这种策略丢弃任务并抛出异常,是默认策略。
  2. DiscardPolicy,不能执行的任务将被丢弃,这种策略什么都没做。
  3. DiscardOldestPolicy,在 pool 没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。
  4. CallerRunsPolicy,这个策略不想放弃执行任务。但是由于 poll 中已经没有任何资源了,那么就直接使用调用该 execute 的线程本身来执行。