Android线程池ThreadPoolExecutor源码解析
一、ThreadPoolExecutor介绍
1、定义
ThreadPoolExecutor线程池本质就是缓存一定线程数量的区域(池子)
2、作用
实现对批量线程的统一管理、调度、分配和监控
复用线程
3、优点
降低因线程的创建和销毁带来的性能开销,可以复用线程池中的线程
提高线程响应速度和执行效率,复用线程就意味着省去了创建线程的过程;管理线程就意味着可以优化线程的执行顺序,避免大量线程间因互相抢占系统资源而出现阻塞的情况
提高对线程的管理度
4、一般直接创建Thread存在的问题
每次新创建线程或销毁线程对象会消耗系统资源,并且响应速度慢
线程缺乏统一的管理和调度,完全交由系统来调度,容易出现多个线程因抢占资源出现阻塞的情况。
5、ThreadPoolExecutor核心参数
ThreadPoolExecutor类主要有6个核心参数
参数 | 含义 | 参数说明 |
---|---|---|
corePoolSize | 核心线程数 | 默认情况下,核心线程数会一直存活下去,即使此时线程池处于空闲状态。 |
maximumPoolSize | 线程池所容纳的最大线程数 | 当活跃的线程数达到最大线程数后,后续任务就会出现阻塞 |
keepAliveTime | 非核心线程空闲状态超时时间 | 空闲时间超过keepAliveTime,非核心线程就会被回收(若设置allowCoreThreadTimeout为true,那么keepAliveTime也是核心线程空闲超时时间) |
unit | 指定keepAliveTime参数时间单位 | 常用的单位: TimeUnit.MILLISECONDS (毫秒)、TimeUnit.SECONDS (秒)、TimeUnit.MINUTES (分) |
workQueue | 任务队列 | 内部存储Runnable(任务)的队列,一个Runnable就是一个任务,通过线程池的execute方法传入,Runnable对象存储在workQueue队列中 |
threadFactory | 创建线程Thread对象工厂对象(是一个接口) | 主要是为线程池创建新的线程,它只有一个抽象方法Thread newThread(Runnable r) |
6、ThreadPoolExecutor类继承结构
二、Executor接口类
public interface Executor {
void execute(Runnable command);//execute抽象方法,执行一个Runnable任务
}
我们一般执行异步任务都会启动一个线程会去创建一个新的Thread, 然后调用start方法开启一个线程即可:
new Thread(new Runnable(){
@Override
public void run() {
//todo sth
}
}).start();
但是,我们可以利用Executor实现同步、异步两种调度器
//同步调度器
class SyncExecutor implements Executor {
@Override
public void execute(Runnable command) {
command.run();//注意,这里只是同步执行一个任务,没有启动任何线程,所以是同步的
}
}
class Task implements Runnable {
@Override
public void run() {
//todo sth
}
}
class Test {
public static void main(String[] args) {
Executor syncExecutor = new SyncExecutor();
syncExcutor.execute(new Task());//向同步调度器中提交一个Task任务
}
}
//异步调度器
class AsyncExecutor implements Executor {
@Override
public void execute(Runnable command) {
new Thread(command).start();//注意: 对于提交的任务,都开启一个新的线程去执行,所以每个任务都是独立运行在不同的工作线程中
}
}
class Task implements Runnable {
@Override
public void run() {
//todo sth
}
}
class Test {
public static void main(String[] args) {
Executor asyncExecutor = new AsyncExecutor();
asyncExecutor.execute(new Task());
}
}
其实,我们还可以组合两个Executor来使用,下面这个实现是将所有的任务都加到一个queue
中,然后从queue
中取任务,交给真正的执行器执行,这里采用synchronized
进行并发控制。
class SerialExecutor implements Executor {
//任务队列
private final BlockingQueue<Runnable> mTaskQueue = new LinkedBlockingQueue<>();
private final Executor mExecutor;
private Runnable mActiveTask;
public SerialExecutor(Executor executor) {
this.mExecutor = executor;
}
@Override
public synchronized void execute(Runnable command) {
mTaskQueue.offer(new Runnable(){
@Override
public void run() {
try {
command.run();//当前任务command执行
} finally {
scheduleNext();//当前任务command执行结束,开始自动调度下一个任务
}
}
});
if(mActiveTask == null) {//如果第一次mActiveTask为null, 就手动执行调度去任务队列中取任务
scheduleNext();
}
}
//调度执行下一个任务
private synchronized void scheduleNext() {
// mTaskQueue.poll()从队列中取出任务
if((mActiveTask = mTaskQueue.poll()) != null) {
mExecutor.execute(mActiveTask);
}
}
}
三、具有生命周期的ExecutorService接口
通过
Executors
线程池工具类创建的线程池一般都是使用ExecutorService
,它赋予了线程池更丰富的能力,比如生命周期和调用的接口方法。
public interface ExecutorService extends Executor {
void shutdown();//关闭线程池,不再接收新的任务
List<Runnable> shutdownNow();//关闭线程池,不再接收新的任务,尝试终止正在执行的所有任务
boolean isShutDown();//当前线程池是否是SHUTDOWN状态
boolean isTerminated();//调用了shutdown或shutdownNow后,所有任务都结束了,那么会返回true,这个方法必须在调用shutdown或shutdownNow后调用才会返回true
boolean awaitTermination(long timeout, TimeUnit unit) throw InterruptedExeception;//等待所有任务完成,并设置超时时间,一般会先调用shutdown或shutdownNow后,然后再调用这个方法等待所有线程任务执行完毕
<T> Future<T> submit(Callable<T> task);//提交一个Callable任务
<T> Future<T> submit(Runnable task, T result);//提交一个Runnable任务,第二个参数将会被封装到Future中,作为最后result的返回值,因为Runnable的run方法是不带返回值的。
Future<?> submit(Runnable task);//提交一个Runnable任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;//执行所有任务,返回一个Future的List集合
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;//执行所有任务,返回一个Future的List集合,可以设置超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;//只要其中一个任务结束了,就可以返回,返回执行完任务的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;//只要其中一个任务结束了,就可以返回,返回执行完任务的结果, 但是这个带超时时间,超过指定时间,就抛出TimeoutException异常
}
四、AbstractExecutorService抽象类
AbstractExecutorService
抽象类实现自ExecutorService
接口,然后在实现了子类所需的共有方法,比如newTaskFor
方法、sumbit
方法等. 针对ExecutorService接口做了部分实现
public abstract class AbstractExecutorService implements ExecutorService {
//将一个Runnable任务包装成RunnableFuture
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
//将一个Callable任务包装成RunnableFuture
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
//用于提交批量任务
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try {
//遍历所有的tasks
for (Callable<T> t : tasks) {
//将每个Callable任务包装成一个RunnableFuture
RunnableFuture<T> f = newTaskFor(t);
//并把它加入到futures集合中
futures.add(f);
//然后调用execute执行一次每个RunnableFuture
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try { f.get(); }
catch (CancellationException ignore) {}
catch (ExecutionException ignore) {}
}
}
return futures;
} catch (Throwable t) {
//如果出现异常就是取消所有任务
cancelAll(futures);
throw t;
}
}
//用于提交批量任务
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
final long nanos = unit.toNanos(timeout);
final long deadline = System.nanoTime() + nanos;
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
int j = 0;
timedOut: try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final int size = futures.size();
for (int i = 0; i < size; i++) {
if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L)
break timedOut;
execute((Runnable)futures.get(i));
}
for (; j < size; j++) {
Future<T> f = futures.get(j);
if (!f.isDone()) {
try { f.get(deadline - System.nanoTime(), NANOSECONDS); }
catch (CancellationException ignore) {}
catch (ExecutionException ignore) {}
catch (TimeoutException timedOut) {
break timedOut;
}
}
}
return futures;
} catch (Throwable t) {
cancelAll(futures);
throw t;
}
// Timed out before all the tasks could be completed; cancel remaining
cancelAll(futures, j);
return futures;
}
private static <T> void cancelAll(ArrayList<Future<T>> futures) {
cancelAll(futures, 0);
}
/** Cancels all futures with index at least j. */
private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
for (int size = futures.size(); j < size; j++)
futures.get(j).cancel(true);
}
}
五、FutureTask任务包装类
我们在
ExecutorService
中有接触到FutureTask
,FutureTask
通过RunnableFuture
间接实现了Runnable
接口,所以每个外部任务的Runnable
一般会先包装成一个FutureTask
,然后提交给线程池.
我们都知道,Runnable
的run
是不会带返回值的,所以如果我们通过submit(task, result)
方法向线程池中提交任务的第二个参数就是作为任务的返回值。
public <T> Future<T> submit(Runnable task, T result)
其实这两个参数最终还是会被包装到一个Callable
接口实现类RunnableAdapter
对象中。
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
//将task和result包装成一个FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
//然后再将runnable和result最终包装成一个Callable
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
//Executors工具类中callable方法将task,result包装成一个RunnableAdapter对象
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
//RunnableAdapter类实现了Callable接口
private static final class RunnableAdapter<T> implements Callable<T> {
private final Runnable task;
private final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();//task.run执行在子线程中
return result;//然后就返回传入result
}
}
六、ThreadPoolExecutor基本使用
ThreadPoolExecutor
类 = 线程池的真正实现类,开发者只需要根据不同的需求配置核心参数,从而实现自定义线程池使用线程池基本5步:
1、创建ThreadFactory线程工厂对象,用于生产新的线程Thread对象
2、创建任务队列对象BlockingQueue
3、创建线程池对象ThreadPoolExecutor
4、调用execute方法向线程池中提交任务或者调用submit提交任务,或者调用invokeAll提交批量任务
5、Activity销毁,调用shutdown方法,关闭线程池
class ThreadPoolActivity : AppCompatActivity() {
//1、创建ThreadFactory对象用于创建新的线程
private val mThreadFactory: ThreadFactory by lazy {
ThreadFactory { runnable ->
val mCount = AtomicInteger(1)
return@ThreadFactory Thread(runnable, "#ThreadPool-${mCount.incrementAndGet()}")
}
}
//2、创建任务队列对象
private val mPoolWorkQueue: BlockingQueue<Runnable> = ArrayBlockingQueue<Runnable>(128)
//3、创建线程池
private val mThreadPoolExecutor: ExecutorService by lazy {
//配置线程池参数:
// CORE_POOL_SIZE(核心线程数)、
// MAXIMUM_POOL_SIZE(线程可容纳的最大线程数)、
// KEEP_ALIVE_TIME(非核心线程空闲状态的超时时间)、
// TimeUnit.SECONDS(超时时间的单位)、
// mPoolWorkQueue(任务队列)、
// mThreadFactory(创建新线程的工厂对象)
ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
mPoolWorkQueue,
mThreadFactory
)
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_thread_pool)
//4、execute执行一个任务
mThreadPoolExecutor.execute {
Log.d(TAG, "current thread is ${Thread.currentThread().name}")
}
}
override fun onDestroy() {
super.onDestroy()
//5、关闭线程池
mThreadPoolExecutor.shutdown()
}
companion object {
private val CPU_COUNT = Runtime.getRuntime().availableProcessors()//CPU的数量
private val CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4))//线程池可容纳核心线程数量 2 <= CORE_POOL_SIZE <= 4
private val MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1//线程池可容纳最大线程数量 CPU_COUNT * 2 + 1
private const val KEEP_ALIVE_TIME: Long = 30//空闲超时时间
private const val TAG = "ThreadPoolActivity"
}
}
七、四种常用的线程池
实际上,实现线程池核心功能就是ThreadPoolExecutor,然而这4种线程池只是根据不同配置参数策略来对线程池进行分类。所以开发者可以根据不同需求来配置核心参数,从而实现自定义的线程池。
定长线程池(FixedThreadPool)
定时线程池(ScheduleThreadPool)
可缓存线程池(CachedThreadPool)
单线程化线程池(SingleThreadExecutor)
1、定长线程池(FixedThreadPool)
- 源码定义:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
参数说明:
1、核心线程数等于线程池最大线程数都为外部指定的数量
hThread
,只有核心线程;2、keepAliveTime超时时间为0,没有设置allowCoreThreadTimeout(true),也就是池内线程不会在空闲状态下被回收。
3、workQueue任务队列是不定长的,默认初始容量为长度Integer.MAX_VALUE
特点:
只有核心线程且不会被回收、线程数量固定、任务队列无大小限制(超出线程数任务就会放入队列等待)
应用场景:
控制线程最大并发数
使用例子:
class FixedThreadPoolActivity : AppCompatActivity() { private val mCPUCount = Runtime.getRuntime().availableProcessors()//获取的CPU数量 private val mSequenceGenerateNum: AtomicInteger = AtomicInteger(0) //1、创建一个定长数的线程池,只有核心线程数,核心线程数等于CPU的数量 private val mFixedThreadPool: ExecutorService by lazy { Executors.newFixedThreadPool(mCPUCount) { runnable -> return@newFixedThreadPool Thread( runnable, "#OtherThreadPool-${mSequenceGenerateNum.incrementAndGet()}-thread" ) } } //2、创建一个任务对象(Runnable对象)也就是需要执行某项任务 private val mTask: Runnable by lazy { Runnable { Log.d(TAG, "开始执行任务: current thread is ${Thread.currentThread().name}") } } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_other_thread_pool) //3、提交一个task mFixedThreadPool.submit(mTask) } override fun onDestroy() { super.onDestroy() //4、关闭线程池 mFixedThreadPool.shutdown() } companion object { private const val TAG = "FixedThreadPoolActivity" } }
2、定时线程池(ScheduledThreadPoolExecutor)
- 类继承关系图
源码定义
//指定核心线程数corePoolSize public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } //指定核心线程数corePoolSize和Thread public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
特点:
核心线程数量固定、非核心线程数量无限制(闲置时马上回收),keepAliveTime只有10s
应用场景:
执行定时 / 周期性 任务
使用例子:
class ScheduledThreadPoolActivity : AppCompatActivity() { private val mCPUCount = Runtime.getRuntime().availableProcessors()//获取的CPU数量 private val mSequenceGenerateNum: AtomicInteger = AtomicInteger(0) private val mStartTime: Long by lazy { System.currentTimeMillis() } //1、创建定时线程池对象mScheduledThreadPool和设置线程池线程数量固定为mCPUCount private val mScheduledThreadPool: ScheduledExecutorService by lazy { Executors.newScheduledThreadPool(mCPUCount) { runnable -> return@newScheduledThreadPool Thread(runnable,"#ScheduledThreadPool-${mSequenceGenerateNum.incrementAndGet()}-thread") } } //2、创建一个任务对象(Runnable对象)也就是需要执行某项任务 private val mTask: Runnable by lazy { Runnable { Log.d(TAG, "执行任务: current thread is ${Thread.currentThread().name} --- time is ${(System.currentTimeMillis() - mStartTime)/1000}s ") } } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_scheduled_thread_pool) Log.d(TAG, "start time is $mStartTime") //3、 提交定时任务 // schedule: 系统启动后3s开始执行mTask,执行完就完事了 mScheduledThreadPool.schedule(mTask, 3L, TimeUnit.SECONDS) // scheduleAtFixedRate: 系统启动后3s开始第一次执行mTask,执行完后: 会周期性重复执行mTask, 任务与任务之间的间隔 = Math.max(mTask执行的时长, period设置周期时长) mScheduledThreadPool.scheduleAtFixedRate(mTask, 3L, 5L, TimeUnit.SECONDS) //scheduleWithFixedDelay: 系统启动后3s开始第一次执行mTask,执行完后: 会周期性重复执行mTask, 任务与任务之间的间隔 = mTask执行的时长 + delay设置周期时长 mScheduledThreadPool.scheduleWithFixedDelay(mTask, 3L, 5L, TimeUnit.SECONDS) } override fun onDestroy() { super.onDestroy() //4、关闭线程池 mScheduledThreadPool.shutdown() } companion object { private const val TAG = "ScheduledThreadPoolTest" } }
3、可缓存线程池(CachedThreadPool)
源码定义:
//默认核心线程数为0、全是非核心线程数,且数量不受限制;任务队列是同步队列SynchronuosQueue public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
特点:
只有非核心线程、线程数量不固定(可无限大)、灵活回收空闲线程(具备超时机制,全部回收时几乎不占系统资源)、新建线程(无线程可用时), 任何线程任务到来都会立刻执行,不需要等待
应用场景:
执行大量任务且耗时较少的线程任务
使用例子:
class CachedThreadPoolActivity : AppCompatActivity() { private val mSequenceGenerateNum: AtomicInteger = AtomicInteger(0) //1、创建可缓存线程池对象, 注意: 没有核心线程, // 完全就是利用了线程池最基本复用原理来达到缓存线程,缓存时间就是keepAlive时间,即非核心线程空闲存活的时间 // 当执行完第二个任务时,第一个任务已经执行完毕后,那么就会去复用第一个任务的线程,而不会重复创建新的线程 private val mCachedThreadPool: ExecutorService by lazy { Executors.newCachedThreadPool { runnable -> return@newCachedThreadPool Thread( runnable, "#CachedThreadPool-${mSequenceGenerateNum.incrementAndGet()}-thread" ) } } //2、创建好Runnable类线程对象和需要执行的任务 private val mTask: Runnable = Runnable { Log.d(TAG, "执行任务: current thread is ${Thread.currentThread().name} ") } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_cached_thread_pool) //3、向线程池提交任务: execute(runnable) mCachedThreadPool.execute(mTask) } override fun onDestroy() { super.onDestroy() //4、关闭线程池 mCachedThreadPool.shutdown() } companion object { private const val TAG = "ScheduledThreadPool" } }
4、单线程化线程池(SingleThreadExecutor)
类的继承关系
源码定义
//无需指定任何参数,内部采用FinalizableDelegatedExecutorService包装了一个只有1个核心线程的线程池,任务队列可以是无限制的。 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } //指定threadFactory参数,内部采用FinalizableDelegatedExecutorService包装了一个只有1个核心线程的线程池,任务队列可以是无限的。 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
特点:
只有一个核心线程(保证所有任务按照指定顺序在一个线程中执行,不需要处理线程同步的问题)
应用场景:
不支持并发,因为只有一个子线程,比如一些影响主线程的操作,适合多个任务按照顺序串行在单线程中执行
使用例子:
class SingleThreadExecutorActivity : AppCompatActivity() { private val mSequenceGenerateNum: AtomicInteger = AtomicInteger(0) //1、创建单线程化线程池 private val mSingleThreadExecutor: ExecutorService by lazy { Executors.newSingleThreadExecutor{ runnable -> return@newSingleThreadExecutor Thread(runnable, "#SingleThreadExecutor-${mSequenceGenerateNum.incrementAndGet()}-thread") } } //2、创建好Runnable类线程对象和需要执行的任务 private val mTask: Runnable by lazy { Runnable { Log.d(TAG, "执行任务: current thread is ${Thread.currentThread().name} ") } } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_single_thread_executor) //3、提交线程任务 mSingleThreadExecutor.submit(mTask) } override fun onDestroy() { super.onDestroy() //4、关闭线程池 mSingleThreadExecutor.shutdown() } companion object { private const val TAG = "SingleThreadExecutor" } }
5、四种线程池的对比
线程池类型 | 池内 线程类型 | 池内 线程数量 | 处理方式 | 应用场景 |
---|---|---|---|---|
定长线程池(newFixedThreadPool) | 只有核心线程 | 固定数量,外部指定 | 1、核心线程处于空闲状态也不会被回收,除非线程被关闭 2、当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有空闲的线程。 3、任务队列无大小限制(超出任务会在任务队列中等待) |
控制线程的最大并发数 |
定时线程池(newScheduleThreadPoo) | 核心线程与非核心线程 | 1、核心线程固定 2、非核心线程 无限制 |
当非核心线程闲置时,则会被立即回收 | 执行定时/周期性任务 |
可缓存线程池(newCachedThreadPool) | 只有非核心线程 | 非核心线程数不固定且无限制 | 1、优先利用闲置线程处理新的任务(复用线程) 2、无闲置的线程可用时,即创建新的线程(即 任何线程的任务到来都会立刻执行,不需要等待) 3、灵活回收空闲线程(具备超时机制=60s,即空闲60s才会回收,如果在60s内有任务到来可以直接复用,全部回收几乎不占用资源) |
执行数量多,耗时少的线程任务 |
单线程化线程池(newSingleThreadPool) | 只有核心线程 | 1个 | 1、保证所有任务都按照指定串行执行 2、不需要处理线程同步的问题 |
单线程(不适合并发但可能引起IO阻塞性及影响UI线程响应的操作,如数据库操作,文件操作等,适合多个任务串行排队在单线程执行) |
八、ThreadPoolExecutor内部原理
九、ThreadPoolExecutor中4种拒绝任务策略
1、AbortPolicy策略
这是一个默认策略,如果我们构造线程池的时候不传相应的 handler 的话,那就会指定使用这个,
策略规则是:不管怎样, 丢弃新任务,直接抛出 RejectedExecutionException 异常
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
//直接抛出 RejectedExecutionException 异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
2、CallerRunsPolicy策略
策略规则是:只要线程池没有被关闭,那么由提交任务的线程所处线程自己来执行这个任务。比如是主线程提交的任务,那么就由主线程来执行这个任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {//如果当前线程池没有关闭
r.run();//直接执行Runnable,相当于直接在提交任务当前线程执行任务
}
}
}
3、DiscardPolicy策略
策略规则是:不做任何处理,直接忽略掉这个任务
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//do nothing 不做处理,直接忽略
}
}
4、DiscardOldestPolicy策略
这个相对霸道一点,如果线程池没有被关闭的话,
策略规则是:把此时队列队头的第一个任务(也就是即将要执行)直接扔掉,然后提交这个任务到等待队列中,插入到队尾
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {//若当前线程池未关闭
e.getQueue().poll();//把当前任务队列中队头元素直接扔掉
e.execute(r);//再把当前任务提交到线程池中
}
}
}
十、ThreadPoolExecutor源码分析
1、解释几个参数和方法
在线程池中采用一个32位整数来存放线程的状态和当前线程池中的线程数,其中高3位用于存放线程池的状态,低29位表示线程数(2^29 - 1 = 536860911个线程数)
COUNT_BITS = Integer.SIZE - 3 = 32 - 3 = 29
- 线程池容量CAPACITY二进制运算过程:
(1 << COUNT_BITS) - 1 = 0001 1111 1111 1111 1111 1111 1111 1111 => 线程允许最大的数量CAPACITY为2^29 - 1 = 536860911
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
1 = 0000 0000 0000 0000 0000 0000 0000 0001
COUNT_BITS = 29
1 << 29 => 0010 0000 0000 0000 0000 0000 0000 0000
(1 << 29) - 1 => 0001 1111 1111 1111 1111 1111 1111 1111 //(高3位000 为线程池状态)
2、线程池的5种状态
- 1、线程池RUNNING状态二进制运算过程:
接收新的任务,处理等待任务队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
RUNNING = -1 << COUNT_BITS
-1 => 1111 1111 1111 1111 1111 1111 1111 1111
-1 << 29 => 1110 0000 0000 0000 0000 0000 0000 0000
=> 1110 0000 0000 0000 0000 0000 0000 0000 //state(高3位): 111 value(状态10进制值): -536870912
- 2、线程池SHUTDOWN状态二进制运算过程
不接受新的任务,但是会继续处理等待队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
SHUTDOWN = 0 << COUNT_BITS = 0
0 => 0000 0000 0000 0000 0000 0000 0000 0000
0 << 29 => 0000 0000 0000 0000 0000 0000 0000 0000
=> 0000 0000 0000 0000 0000 0000 0000 0000 //state(高3位): 000 value(状态10进制值): 0
- 3、线程池STOP状态二进制运算过程
不接受新的任务,不再处理等待队列中的任务,中断正在执行的任务线程
private static final int STOP = 1 << COUNT_BITS;
STOP = 1 << COUNT_BITS
1 => 0000 0000 0000 0000 0000 0000 0000 0001
1 << 29 => 0010 0000 0000 0000 0000 0000 0000 0000
=> 0010 0000 0000 0000 0000 0000 0000 0000//state(高3位): 001 value(状态10进制值): 536870912
- 4、线程池TIDYING状态二进制运算过程
所有的任务都销毁了,workCount线程数为0;线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
private static final int TIDYING = 2 << COUNT_BITS;
TIDYING = 2 << COUNT_BITS
2 => 0000 0000 0000 0000 0000 0000 0000 0010
2 << 29 => 0100 0000 0000 0000 0000 0000 0000 0000
=> 0100 0000 0000 0000 0000 0000 0000 0000//state(高3位): 010 value(状态10进制值): 1073741824
- 5、线程池TERMINATED状态二进制运算过程
terminated()钩子方法执行结束后,线程池就会变成这个状态
private static final int TERMINATED = 3 << COUNT_BITS;
TERMINATED = 3 << COUNT_BITS
3 => 0000 0000 0000 0000 0000 0000 0000 0011
3 << 29 => 0110 0000 0000 0000 0000 0000 0000 0000
=>0110 0000 0000 0000 0000 0000 0000 0000 //state(高3位): 011 value(状态10进制值): 1610612736
注意:
RUNNING值为-536870912,SHUTDOWN值为0,其他的都比0大,所以等于0也就是SHUTDOWN的状态下不能提交新的任务,大于0的时候就连正在执行的任务也需要中断,如果值小于0的时候线程池状态就是RUNNING
3、线程池状态转化图
4、ThreadPoolExecutor构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
//7个参数: corePoolSize核心线程数
//maximumPoolSize最大线程数
//keepAliveTime(非核心线程超时时间、若allowCoreThreadTimeOut为true,那么它也是核心线程的超时时间)
//Unit(keepAliveTime单位)
//workQueue(任务队列)
//threadFactory(创建线程工厂类对象)
//handler(RejectedExecutionHandler拒绝任务策略)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
5、第一步,从execute入口方法开始,execute接收一个Runnable对象作为参数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//ctl初始值为默认RUNNING状态值为-536870912
private static int ctlOf(int rs, int wc) { return rs | wc; }//RUNNING | 0 = RUNNING;
private static int workerCountOf(int c) { return c & CAPACITY;}//workerCountOf(c) = RUNNING & CAPACITY =>
1110 0000 0000 0000 0000 0000 0000 0000
&
0001 1111 1111 1111 1111 1111 1111 1111
=
0000 0000 0000 0000 0000 0000 0000 0000
//所以初始线程池的数量为0
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();//任务为空,抛出空指针异常
//通过AtomicInteger对象ctl拿到它的值,ctl初始值为默认RUNNING状态值为-536870912
int c = ctl.get();
//通过workerCountOf拿到线程池中线程数量,最开始默认线程池中线程数为0
//----如果当前线程池线程数量 <(小于) 核心线程数corePoolSize----
if (workerCountOf(c) < corePoolSize) {
//执行addWorker方法,在内部创建一个新的核心线程来执行任务,当前任务command会作为这个线程的第一个任务(firstTask)执行,ture表示创建核心线程
if (addWorker(command, true))
return;//如果添加任务成功,那么这个执行方法就结束了,提交任务嘛,线程池已经接受这个任务了,说明它的职责也就结束了。addWorker返回false表示线程池不允许提交任务
c = ctl.get();
}
//----如果当前线程池数量 >=(超过) 核心线程数corePoolSize, 会执行到此处---
//当前线程处于RUNNING状态 &&(并且) 尝试offer任务排队成功(offer返回true表示未满,返回false表示队列满了)
if (isRunning(c) && workQueue.offer(command)) {
//尽管任务command已经offer排队成功,但是有可能在排队成功后就出现了线程池关闭情况,所以需要再次检查
int recheck = ctl.get();
//如果再次检查发现当前线程处于非RUNNING,并且成功地把当前这个任务从队列删掉,那么就拒绝当前任务
if (!isRunning(recheck) && remove(command))
reject(command);//拒绝当前任务
//如果当前线程池线程的数量为0,也就是核心线程数corePoolSize为0,全部都是非核心线程时,那么就创建非核心线程,来执行队列中的任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果当前线程池数量 >=(超过) 核心线程数corePoolSize &&(并且) 队列满了,就创建非核心线程来执行任务command
else if (!addWorker(command, false))
//如果非核心线程创建失败,也就是当前线程池线程数超过最大线程数,就执行拒绝任务
reject(command);
}
6、第二步,再分析一个特别重要的方法addWorker(Runnable firstTask, boolean core)
7、第三步,内部类ThreadPoolExecutor.Worker实现
8、第四步,通过addWorker方法,创建了一个Worker对象,并且启动了一个新的线程来执行,线程启动后,Worker中的run就会被回调,因为Worker实现Runnable接口,创建Thread的时把自己作为Runnable参数传入来创建。
/** Delegates main run loop to outer runWorker. */
public void run() {
////注意: 这个run是执行在线程池中工作线程中,完成了线程的切换,然后委托调用它的外部类ThreadPoolExecutor中的runWorker(this)方法,并把Worker对象传递出去
runWorker(this);
}
9、第五步,执行runWorker(worker)方法,运行在执行工作线程环境中
10、第六步,继续看下getTask方法是如何获取任务
getTask获取任务主要有三种可能:
1、当只有核心线程且当前线程数超过了corePoolSize核心线程数,核心线程不会被回收,getTask就会一直阻塞直到有任务返回,否则就会一直等待任务
2、超时退出,如果keepAliveTime起作用的时候,在keepAliveTime时间内都没有任务,那么就会执行关闭
3、如果触发以下条件直接返回null:
当前线程池中超过了maximumPoolSize最大线程数;
线程池处于SHUTDOWN状态,而且workQueue是空的,不再接收新的任务
线程池处于STOP状态,不接受新的任务,且还要终止执行workQueue中的任务
十一、总结
1、核心口诀:(4、4、5、7)
4: Executors类中4种内置线程池: newFixedThreadPool、newScheduleThreadPool、newCachedThreadPool、newSingleThreadPool
4: 4个拒绝策略: AbortPolicy、CallRunsPolicy、DiscardPolicy、DiscardOldestPolicy
5: 线程池5种状态: Running、Shutdown、Stop、Tidying、Terminated
7: 7个核心参数: corePoolSize(最大核心线程数)、maximumPoolSize(线程池最大线程数)、keepAliveTime(非核心线程超时时间、若allowCoreThreadTimeOut为true,那么它也是核心线程的超时时间)、Unit(keepAliveTime单位)、workQueue(任务队列)、threadFactory(创建线程工厂类对象)、handler(RejectedExecutionHandler拒绝任务策略)
2、Java线程池中有哪些关键属性?
在Java线程池中主要的核心实现类是ThreadPoolExecutor, 它的构造器关键参数主要6个:
- 1、corePoolSize: 核心线程数,核心线程默认空闲状态不会被回收,除非设置allowCoreThreadTimeOut为true
- 2、maximumPoolSize: 线程池允许的最大线程数,当超过该maximumPoolSize,新的任务就会被拒绝,而处于corePoolSize和maximumPoolSize之间的非核心线程在超过keepAliveTime就会被回收
- 3、keepAliveTime: 默认是针对非核心线程空闲超时时间,超过该时间就会被回收,如果设置了allowCoreThreadTimeOut为true,该规则同样是适用于核心线程,最后会执行关闭这些线程的操作。
- 4、workQueue: 任务队列,用于存放任务,如果当前线程数超过了coreThreadPool核心线程数,就会把任务添加到任务队列,线程池中的线程就会负责到队列中拉取任务。
- 5、threadFactory: 创建新线程的工厂类
- 6、RejectedExecutionHandler: 拒绝任务策略抽象接口,用于处理当线程池不能执行此任务时的情况,主要有四种策略实现: AbortPolicy(默认)、CallerRunPolicy、DiscardPolicy、DiscardOldestPolicy
3、说说线程池中的线程创建时机?
- 1、如果当前线程数少于 corePoolSize,那么提交任务的时候创建一个新的线程,并由这个线程执行这个任务;
- 2、如果当前线程数已经达到 corePoolSize,那么将提交的任务添加到队列中,等待线程池中的线程去队列中取任务;
- 3、如果队列已满,那么创建新的线程来执行任务,需要保证池中的线程数不会超过 maximumPoolSize,如果此时线程数超过了 maximumPoolSize,那么执行拒绝策略。
4、Executors.newFixedThreadPool(…) 和 Executors.newCachedThreadPool() 构造出来的线程池有什么差别?
- 1、newFixedThreadPool只有核心线程,且核心线程数固定,采用的是BlockingQueue任务队列,一般用于控制并发数场景
- 2、而newCachedThreadPool()没有核心线程,只有非核心线程且线程数量无限制,采用的是SynchronousQueue同步任务队列,keepAliveTime空闲状态超时时间为60s,所以回收线程池几乎不占用系统资源,会充分利用空闲线程,一般用于大量耗时较少的任务。
5、任务执行过程中发生异常怎么处理?
如果某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其他任务。然后会启动一个新的线程来代替它。
6、什么时候会执行拒绝策略?
拒绝任务主要是两种情况下会执行拒绝任务操作
- 1、线程池中的线程数大于等于corePoolSize时,此时任务需要把它加入到任务队列中,任务入队成功后,但是此时可能线程池状态为非RUNNING运行状态,此时就任务还没有出队列被执行,这时候需要把它从队列中remove成功后就执行reject(command)拒绝策略
- 2、线程池中的线程数大于等于corePoolSize核心线程数,将任务加入到任务队列中,但是此时队列已经满了,就需要去创建非核心线程,但是此时线程数超过了maximumPoolSize最大线程数,就执行拒绝任务策略。