JUC - ThreadPoolExecutor 源码分析
ThreadPoolExecutor,Java线程池。使用线程池可以降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗;可以提高响应速度,当任务到达时,任务可以不需要等待线程创建就能立即执行;可以提高线程的可管理性,防止无限制的创建线程,消耗系统资源;控制任务执行的并发量。
下面首先通过初始化参数介绍一下线程池:
线程池
初始化参数
|
|
corePoolSize:线程池的基本大小,核心线程数,当提交一个任务到线程池时,如果线程数小于核心线程数时,即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。核心线程会一直存活,即使没有任务需要处理,除非调用了allowCoreThreadTimeOut方法则允许核心线程超时终止。
maximumPoolSize:线程池最大大小,当线程数大于或等于核心线程,当任务队列已满,且已创建的线程数小于最大线程数,线程池会创建新的工作线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。
workQueue:任务队列,用于保存等待执行的任务的阻塞队列,当达到corePoolSize的时候,就向该等待队列放入线程信息 。可以选择以下几个阻塞队列。
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
PriorityBlockingQueue:一个具有优先级得无限阻塞队列。
keepAliveTime:线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
unit:线程活动保持时间keepAliveTime的单位,可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。
handler:饱和策略,当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。以下是JDK提供的策略。
AbortPolicy:表示无法处理新任务时抛出异常
CallerRunsPolicy:使用调用者所在线程来运行任务。
DiscardOldestPolicy:丢弃队列里当前第一个任务,并重新提交当前任务。
DiscardPolicy:不处理。
当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如监控,记录日志或持久化不能处理的任务。
线程池执行任务流程
线程池按以下行为执行任务:
当线程数小于核心线程数时,创建线程并执行任务。
当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
当线程数大于等于核心线程数,且任务队列已满。若线程数小于最大线程数,创建线程并执行任务;若线程数到达最大线程数,则抛出异常,拒绝任务。
源码分析
ThreadPoolExecutor类继承关系
ThreadPoolExecutor类继承自AbstractExecutorService抽象类,AbstractExecutorService抽象类实现了ExecutorService接口,ExecutorService接口继承自Executor接口。核心执行方法是Executor接口的execute()方法,ExecutorService扩展了submit(),invokeAll(),invokeAny()方法,并在AbstractExecutorService中做了具体实现,这三个方法最终都会调用Executor接口的execute()方法,execute()方法在ThreadPoolExecutor中做了具体实现。
|
|
|
|
ThreadPoolExecutor核心属性
|
|
ThreadPoolExecutor的工作线程Worker
Worker与其说是工作线程,其实是管理工作线程,每个Worker内部的线程在run方法中通过不断的从任务队列中获取任务来不断的处理任务,自身继承自AQS,所以每个Worker自身也是一个锁,保护获取到的任务的执行,在锁状态意味着该Worker正在执行任务。除了创建Worker时提交的任务,其他提交过来的任务都是放入任务队列交给Workers去消费的。
|
|
ThreadPoolExecutor核心方法execute()方法源码分析
public void execute(Runnable command)
在将来某个时间执行给定任务。该方法直接返回不等待任务执行完成,异步处理任务。
|
|
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。一旦返回后,即取消尚未完成的任务。注意,可以正常地或通过抛出异常来终止已完成的任务。如果此操作正在进行时修改了给定的 collection,则此方法的结果是不确定的。
|
|
ThreadPoolExecutor关闭
public void shutdown()
关闭线程池,仍会处理任务队列中的任务,但是不接受新任务。如果已经关闭,则调用没有其他作用。
|
|
public List<Runnable>
shutdownNow()
尝试停止所有的活动执行任务、停止任务队列的处理,并返回等待执行的任务列表。并不保证能够停止正在处理的活动执行任务,但是会尽力尝试。 此实现通过 Thread.interrupt() 取消任务,所以无法响应中断的任何任务可能永远无法终止。
|
|
ThreadPoolExecutor扩展点
ThreadPoolExecutor提供了几个protected的方法,在任务执行前,执行后,和线程池终止时执行。可以在自定义线程池中自由实现,用来记录和监控线程池的运行情况。
|
|