线程池基本原理

当程序中大量使用线程时,由于线程的创建与销毁需要时间,因此如果线程能够复用可以提高效率;同时,通常需要对线程进行统一的管理。线程池就是为解决这种问题而诞生的。

代码结构

Executor

此接口是线程池的顶层接口,只定义了一个方法,表示最终执行任务的行为。

1
void execute(Runnable command);

ExecutorService

此接口继承自Executor接口,并定义了线程池的基本操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

}

AbstractExecutorService

此抽象类实现了线程池的部分操作。

ThreadPoolExectutorService

此类是整个线程池实现的核心类。线程池的代码在Java 8有更新,这里以Java 8的代码为准。
线程池的状态由一个AtomicInteger类型的final变量ctl来控制,其高3位表示运行状态,低29位表示运行的worker数量。

1
2
3
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

运行状态包括以下几种:

1
2
3
4
5
6
7
8
9
10
// 正常接受任务并处理队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 不接受新任务,但继续处理队列中的剩余任务,当调用shutdown()方法时进入此状态
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不接受新任务,不处理队列中的任务,且尝试中断运行中的任务,当调用shutdownNow()方法时进入此状态
private static final int STOP = 1 << COUNT_BITS;
// 所有任务已经结束,调用terminated方法
private static final int TIDYING = 2 << COUNT_BITS;
// terminated方法已完成
private static final int TERMINATED = 3 << COUNT_BITS;

此外,提交任务的两个重要方法有一些区别:submit()方法有多个重载,其实质调用execute()方法,返回Future类型的变量用以监控执行结果;execute()方法只是简单将任务提交,并不获取任务执行结果。

类型

ThreadPoolexecutor重载了四个构造方法,用以提供多种配置的线程池实现。
线程池的类型除了JDK源码提供的之外,用户也可以自己实现,这里介绍几种常见的实现,均位于工具类Executors中。

newCachedThreadPool

线程池设有缓存,如果在缓存中有工作线程可复用,新的任务将复用已缓存的线程,否则将创建新的线程执行,并在执行完后将线程放入缓存;缓存线程设置有超时时间,超时后将销毁。

newFixedThreadPool

线程池的大小固定,新任务提交时将创建新线程,作为工作线程执行或进入有界等待队列。

newScheduledThreadPool

与newFixedThreadPool很相似,且允许任务定时或周期性地执行。

newSingleThreadExecutor

线程在无界队列中顺序执行,同时只有一个活动的工作线程,线程池相当于单线程。

任务管理

线程池管理线程的步骤包括提交和处理任务,其中的参数直接影响了线程池的行为和性能。
值得注意的是,大部分提及的参数均由volatile关键字修饰,用以加强线程池本身的线程安全。

阻塞队列

线程池中有一个有界阻塞队列workQueue,用来存储待执行的任务,该队列是一个BlockingQueue接口的实现类。

拒绝处理器

池中有一个处理器handler,用来提供当线程池在某些情况下拒绝执行任务时的处理逻辑,该处理器实现了RejectedExecutionHandler接口。

拒绝处理策略

线程池有四种拒绝处理策略,均由实现了RejectedExecutionHandler接口的静态内部类来执行。

  • AbortPolicy
    丢弃提交的任务,并抛出RejectedExecutionException异常。
  • DiscardPolicy
    丢弃提交的任务,但不抛出异常,相当于什么也不做。
  • DiscardOldestPolicy
    丢弃阻塞队列头部未执行的任务,并重试将当前任务添加进队列。
  • CallerRunsPolicy
    由调用的线程直接处理该任务。

任务提交

线程池中有以下几个重要参数,决定了有新任务提交时的处理逻辑。

  • corePoolSize
    线程池的基本大小。
  • maximumPoolSize
    线程池中允许的最大线程数。
  • poolSize
    线程池中当前线程的数量。
  • queueSize
    线程池等待队列的当前线程数。
  • queueCapacity
    线程池等待队列的最大线程数。

每当线程池新增任务时:

  • 如果poolSize小于corePoolSize,直接新增线程进行处理,poolSize加1。
  • 如果poolSize达到corePoolSize,新任务放入阻塞队列中等待执行。
  • 如果queueSize达到queueCapacity,但poolSize仍小于maximumPoolSize,新增线程处理。
  • 如果queueSize达到queueCapacity,且poolSize达到maximumPoolSize, 根据RejectedExecutionHandler的逻辑处理。