Android线程池ThreadPoolExecutor源码解析

Android线程池ThreadPoolExecutor源码解析

一、ThreadPoolExecutor介绍

1、定义

ThreadPoolExecutor线程池本质就是缓存一定线程数量的区域(池子)

2、作用

  • 实现对批量线程的统一管理、调度、分配和监控

  • 复用线程

3、优点

  • 降低因线程的创建和销毁带来的性能开销,可以复用线程池中的线程

  • 提高线程响应速度和执行效率,复用线程就意味着省去了创建线程的过程;管理线程就意味着可以优化线程的执行顺序,避免大量线程间因互相抢占系统资源而出现阻塞的情况

  • 提高对线程的管理度

4、一般直接创建Thread存在的问题

  • 每次新创建线程或销毁线程对象会消耗系统资源,并且响应速度慢

  • 线程缺乏统一的管理和调度,完全交由系统来调度,容易出现多个线程因抢占资源出现阻塞的情况。

5、ThreadPoolExecutor核心参数

ThreadPoolExecutor类主要有6个核心参数

参数 含义 参数说明
corePoolSize 核心线程数 默认情况下,核心线程数会一直存活下去,即使此时线程池处于空闲状态。
maximumPoolSize 线程池所容纳的最大线程数 当活跃的线程数达到最大线程数后,后续任务就会出现阻塞
keepAliveTime 非核心线程空闲状态超时时间 空闲时间超过keepAliveTime,非核心线程就会被回收(若设置allowCoreThreadTimeout为true,那么keepAliveTime也是核心线程空闲超时时间)
unit 指定keepAliveTime参数时间单位 常用的单位: TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)
workQueue 任务队列 内部存储Runnable(任务)的队列,一个Runnable就是一个任务,通过线程池的execute方法传入,Runnable对象存储在workQueue队列中
threadFactory 创建线程Thread对象工厂对象(是一个接口) 主要是为线程池创建新的线程,它只有一个抽象方法Thread newThread(Runnable r)

6、ThreadPoolExecutor类继承结构

二、Executor接口类

public interface Executor {
    void execute(Runnable command);//execute抽象方法,执行一个Runnable任务
}

我们一般执行异步任务都会启动一个线程会去创建一个新的Thread, 然后调用start方法开启一个线程即可:

new Thread(new Runnable(){
    @Override
    public void run() {
        //todo sth
    }
}).start();

但是,我们可以利用Executor实现同步、异步两种调度器

//同步调度器
class SyncExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        command.run();//注意,这里只是同步执行一个任务,没有启动任何线程,所以是同步的
    }
}

class Task implements Runnable {
    @Override
    public void run() {
        //todo sth
    }
}

class Test {
    public static void main(String[] args) {
        Executor syncExecutor = new SyncExecutor();
        syncExcutor.execute(new Task());//向同步调度器中提交一个Task任务
    }
}

//异步调度器
class AsyncExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        new Thread(command).start();//注意: 对于提交的任务,都开启一个新的线程去执行,所以每个任务都是独立运行在不同的工作线程中
    }
}

class Task implements Runnable {
    @Override
    public void run() {
        //todo sth
    }
}

class Test {
    public static void main(String[] args) {
        Executor asyncExecutor = new AsyncExecutor();
        asyncExecutor.execute(new Task());
    }
}

其实,我们还可以组合两个Executor来使用,下面这个实现是将所有的任务都加到一个queue中,然后从queue中取任务,交给真正的执行器执行,这里采用synchronized进行并发控制。

class SerialExecutor implements Executor {
    //任务队列
    private final BlockingQueue<Runnable> mTaskQueue = new LinkedBlockingQueue<>();
    private final Executor mExecutor;
    private Runnable mActiveTask;

    public SerialExecutor(Executor executor) {
        this.mExecutor = executor;
    }

    @Override
    public synchronized void execute(Runnable command) {
           mTaskQueue.offer(new Runnable(){
               @Override
               public void run() {
                  try {
                      command.run();//当前任务command执行
                  } finally {
                      scheduleNext();//当前任务command执行结束,开始自动调度下一个任务
                  }
               }
           });

           if(mActiveTask == null) {//如果第一次mActiveTask为null, 就手动执行调度去任务队列中取任务
               scheduleNext();
           }                 
    }

    //调度执行下一个任务 
    private synchronized void scheduleNext() {
        // mTaskQueue.poll()从队列中取出任务
        if((mActiveTask = mTaskQueue.poll()) != null) {
            mExecutor.execute(mActiveTask);
        }
    }
}

三、具有生命周期的ExecutorService接口

通过Executors线程池工具类创建的线程池一般都是使用ExecutorService,它赋予了线程池更丰富的能力,比如生命周期和调用的接口方法。

public interface ExecutorService extends Executor {
    void shutdown();//关闭线程池,不再接收新的任务

    List<Runnable> shutdownNow();//关闭线程池,不再接收新的任务,尝试终止正在执行的所有任务

    boolean isShutDown();//当前线程池是否是SHUTDOWN状态

    boolean isTerminated();//调用了shutdown或shutdownNow后,所有任务都结束了,那么会返回true,这个方法必须在调用shutdown或shutdownNow后调用才会返回true

    boolean awaitTermination(long timeout, TimeUnit unit) throw InterruptedExeception;//等待所有任务完成,并设置超时时间,一般会先调用shutdown或shutdownNow后,然后再调用这个方法等待所有线程任务执行完毕

    <T> Future<T> submit(Callable<T> task);//提交一个Callable任务

    <T> Future<T> submit(Runnable task, T result);//提交一个Runnable任务,第二个参数将会被封装到Future中,作为最后result的返回值,因为Runnable的run方法是不带返回值的。

    Future<?> submit(Runnable task);//提交一个Runnable任务

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;//执行所有任务,返回一个Future的List集合

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;//执行所有任务,返回一个Future的List集合,可以设置超时时间

    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;//只要其中一个任务结束了,就可以返回,返回执行完任务的结果

    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;//只要其中一个任务结束了,就可以返回,返回执行完任务的结果, 但是这个带超时时间,超过指定时间,就抛出TimeoutException异常 
 }

四、AbstractExecutorService抽象类

AbstractExecutorService抽象类实现自ExecutorService接口,然后在实现了子类所需的共有方法,比如newTaskFor方法、sumbit方法等. 针对ExecutorService接口做了部分实现

public abstract class AbstractExecutorService implements ExecutorService {
    //将一个Runnable任务包装成RunnableFuture
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    //将一个Callable任务包装成RunnableFuture
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    //用于提交批量任务
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
        try {
            //遍历所有的tasks
            for (Callable<T> t : tasks) {
                //将每个Callable任务包装成一个RunnableFuture
                RunnableFuture<T> f = newTaskFor(t);
                //并把它加入到futures集合中
                futures.add(f);
                //然后调用execute执行一次每个RunnableFuture
                execute(f);
            }
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    try { f.get(); }
                    catch (CancellationException ignore) {}
                    catch (ExecutionException ignore) {}
                }
            }
            return futures;
        } catch (Throwable t) {
            //如果出现异常就是取消所有任务 
            cancelAll(futures);
            throw t;
        }
    }

    //用于提交批量任务    
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        final long nanos = unit.toNanos(timeout);
        final long deadline = System.nanoTime() + nanos;
        ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
        int j = 0;
        timedOut: try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            final int size = futures.size();

            for (int i = 0; i < size; i++) {
                if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L)
                    break timedOut;
                execute((Runnable)futures.get(i));
            }

            for (; j < size; j++) {
                Future<T> f = futures.get(j);
                if (!f.isDone()) {
                    try { f.get(deadline - System.nanoTime(), NANOSECONDS); }
                    catch (CancellationException ignore) {}
                    catch (ExecutionException ignore) {}
                    catch (TimeoutException timedOut) {
                        break timedOut;
                    }
                }
            }
            return futures;
        } catch (Throwable t) {
            cancelAll(futures);
            throw t;
        }
        // Timed out before all the tasks could be completed; cancel remaining
        cancelAll(futures, j);
        return futures;
    }

    private static <T> void cancelAll(ArrayList<Future<T>> futures) {
        cancelAll(futures, 0);
    }

    /** Cancels all futures with index at least j. */
    private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
        for (int size = futures.size(); j < size; j++)
            futures.get(j).cancel(true);
    }
}

五、FutureTask任务包装类

我们在ExecutorService中有接触到FutureTaskFutureTask通过RunnableFuture间接实现了Runnable接口,所以每个外部任务的Runnable一般会先包装成一个FutureTask,然后提交给线程池.

我们都知道,Runnablerun是不会带返回值的,所以如果我们通过submit(task, result)方法向线程池中提交任务的第二个参数就是作为任务的返回值。

public <T> Future<T> submit(Runnable task, T result)

其实这两个参数最终还是会被包装到一个Callable接口实现类RunnableAdapter对象中。

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    //将task和result包装成一个FutureTask
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    //然后再将runnable和result最终包装成一个Callable
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    //Executors工具类中callable方法将task,result包装成一个RunnableAdapter对象
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

   //RunnableAdapter类实现了Callable接口
   private static final class RunnableAdapter<T> implements Callable<T> {
        private final Runnable task;
        private final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();//task.run执行在子线程中
            return result;//然后就返回传入result
        }
   }

六、ThreadPoolExecutor基本使用

ThreadPoolExecutor类 = 线程池的真正实现类,开发者只需要根据不同的需求配置核心参数,从而实现自定义线程池

使用线程池基本5步:

  • 1、创建ThreadFactory线程工厂对象,用于生产新的线程Thread对象

  • 2、创建任务队列对象BlockingQueue

  • 3、创建线程池对象ThreadPoolExecutor

  • 4、调用execute方法向线程池中提交任务或者调用submit提交任务,或者调用invokeAll提交批量任务

  • 5、Activity销毁,调用shutdown方法,关闭线程池

class ThreadPoolActivity : AppCompatActivity() {

    //1、创建ThreadFactory对象用于创建新的线程
    private val mThreadFactory: ThreadFactory by lazy {
        ThreadFactory { runnable ->
            val mCount = AtomicInteger(1)
            return@ThreadFactory Thread(runnable, "#ThreadPool-${mCount.incrementAndGet()}")
        }
    }

    //2、创建任务队列对象
    private val mPoolWorkQueue: BlockingQueue<Runnable> = ArrayBlockingQueue<Runnable>(128)

    //3、创建线程池
    private val mThreadPoolExecutor: ExecutorService by lazy {
        //配置线程池参数:
        // CORE_POOL_SIZE(核心线程数)、
        // MAXIMUM_POOL_SIZE(线程可容纳的最大线程数)、
        // KEEP_ALIVE_TIME(非核心线程空闲状态的超时时间)、
        // TimeUnit.SECONDS(超时时间的单位)、
        // mPoolWorkQueue(任务队列)、
        // mThreadFactory(创建新线程的工厂对象)
        ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAXIMUM_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.SECONDS,
            mPoolWorkQueue,
            mThreadFactory
        )
    }

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_thread_pool)

        //4、execute执行一个任务
        mThreadPoolExecutor.execute {
            Log.d(TAG, "current thread is ${Thread.currentThread().name}")
        }
    }

    override fun onDestroy() {
        super.onDestroy()
        //5、关闭线程池
        mThreadPoolExecutor.shutdown()
    }

    companion object {
        private val CPU_COUNT = Runtime.getRuntime().availableProcessors()//CPU的数量
        private val CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4))//线程池可容纳核心线程数量 2 <= CORE_POOL_SIZE <= 4
        private val MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1//线程池可容纳最大线程数量 CPU_COUNT * 2 + 1
        private const val KEEP_ALIVE_TIME: Long = 30//空闲超时时间
        private const val TAG = "ThreadPoolActivity"
    }
}

七、四种常用的线程池

实际上,实现线程池核心功能就是ThreadPoolExecutor,然而这4种线程池只是根据不同配置参数策略来对线程池进行分类。所以开发者可以根据不同需求来配置核心参数,从而实现自定义的线程池。

  • 定长线程池(FixedThreadPool)

  • 定时线程池(ScheduleThreadPool)

  • 可缓存线程池(CachedThreadPool)

  • 单线程化线程池(SingleThreadExecutor)

1、定长线程池(FixedThreadPool)

  • 源码定义:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • 参数说明:

    1、核心线程数等于线程池最大线程数都为外部指定的数量hThread,只有核心线程;

    2、keepAliveTime超时时间为0,没有设置allowCoreThreadTimeout(true),也就是池内线程不会在空闲状态下被回收。

    3、workQueue任务队列是不定长的,默认初始容量为长度Integer.MAX_VALUE

  • 特点:

    只有核心线程且不会被回收、线程数量固定、任务队列无大小限制(超出线程数任务就会放入队列等待)

  • 应用场景:

    控制线程最大并发数

  • 使用例子:

    class FixedThreadPoolActivity : AppCompatActivity() {
        private val mCPUCount = Runtime.getRuntime().availableProcessors()//获取的CPU数量
        private val mSequenceGenerateNum: AtomicInteger = AtomicInteger(0)
        //1、创建一个定长数的线程池,只有核心线程数,核心线程数等于CPU的数量
        private val mFixedThreadPool: ExecutorService by lazy {
            Executors.newFixedThreadPool(mCPUCount) { runnable ->
                return@newFixedThreadPool Thread(
                    runnable, "#OtherThreadPool-${mSequenceGenerateNum.incrementAndGet()}-thread"
                )
            }
        }
    
        //2、创建一个任务对象(Runnable对象)也就是需要执行某项任务
        private val mTask: Runnable by lazy {
            Runnable {
                Log.d(TAG, "开始执行任务: current thread is ${Thread.currentThread().name}")
            }
        }
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_other_thread_pool)
            //3、提交一个task
            mFixedThreadPool.submit(mTask)
        }
    
        override fun onDestroy() {
            super.onDestroy()
            //4、关闭线程池
            mFixedThreadPool.shutdown()
        }
    
        companion object {
            private const val TAG = "FixedThreadPoolActivity"
        }
    }

2、定时线程池(ScheduledThreadPoolExecutor)

  • 类继承关系图

  • 源码定义

        //指定核心线程数corePoolSize
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
        //指定核心线程数corePoolSize和Thread
        public static ScheduledExecutorService newScheduledThreadPool(
                int corePoolSize, ThreadFactory threadFactory) {
            return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
        }

  • 特点:

    核心线程数量固定、非核心线程数量无限制(闲置时马上回收),keepAliveTime只有10s

  • 应用场景:

    执行定时 / 周期性 任务

  • 使用例子:

    class ScheduledThreadPoolActivity : AppCompatActivity() {
        private val mCPUCount = Runtime.getRuntime().availableProcessors()//获取的CPU数量
        private val mSequenceGenerateNum: AtomicInteger = AtomicInteger(0)
        private val mStartTime: Long by lazy {
            System.currentTimeMillis()
        }
        //1、创建定时线程池对象mScheduledThreadPool和设置线程池线程数量固定为mCPUCount
        private val mScheduledThreadPool: ScheduledExecutorService by lazy {
            Executors.newScheduledThreadPool(mCPUCount) { runnable ->
                return@newScheduledThreadPool Thread(runnable,"#ScheduledThreadPool-${mSequenceGenerateNum.incrementAndGet()}-thread")
            }
        }
    
        //2、创建一个任务对象(Runnable对象)也就是需要执行某项任务
        private val mTask: Runnable by lazy {
            Runnable {
                Log.d(TAG, "执行任务: current thread is ${Thread.currentThread().name} --- time is ${(System.currentTimeMillis() - mStartTime)/1000}s ")
            }
        }
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_scheduled_thread_pool)
            Log.d(TAG, "start time is $mStartTime")
            //3、 提交定时任务
            // schedule: 系统启动后3s开始执行mTask,执行完就完事了
            mScheduledThreadPool.schedule(mTask, 3L, TimeUnit.SECONDS)
            // scheduleAtFixedRate: 系统启动后3s开始第一次执行mTask,执行完后: 会周期性重复执行mTask, 任务与任务之间的间隔 = Math.max(mTask执行的时长, period设置周期时长)
            mScheduledThreadPool.scheduleAtFixedRate(mTask, 3L, 5L, TimeUnit.SECONDS)
            //scheduleWithFixedDelay: 系统启动后3s开始第一次执行mTask,执行完后: 会周期性重复执行mTask, 任务与任务之间的间隔 = mTask执行的时长 + delay设置周期时长
            mScheduledThreadPool.scheduleWithFixedDelay(mTask, 3L, 5L, TimeUnit.SECONDS)
        }
    
        override fun onDestroy() {
            super.onDestroy()
            //4、关闭线程池
            mScheduledThreadPool.shutdown()
        }
    
        companion object {
            private const val TAG = "ScheduledThreadPoolTest"
        }
    }

3、可缓存线程池(CachedThreadPool)

  • 源码定义:

    //默认核心线程数为0、全是非核心线程数,且数量不受限制;任务队列是同步队列SynchronuosQueue
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
  • 特点:

    只有非核心线程、线程数量不固定(可无限大)、灵活回收空闲线程(具备超时机制,全部回收时几乎不占系统资源)、新建线程(无线程可用时), 任何线程任务到来都会立刻执行,不需要等待

  • 应用场景:

    执行大量任务且耗时较少的线程任务

  • 使用例子:

    class CachedThreadPoolActivity : AppCompatActivity() {
    
        private val mSequenceGenerateNum: AtomicInteger = AtomicInteger(0)
    
        //1、创建可缓存线程池对象, 注意: 没有核心线程,
        // 完全就是利用了线程池最基本复用原理来达到缓存线程,缓存时间就是keepAlive时间,即非核心线程空闲存活的时间
        // 当执行完第二个任务时,第一个任务已经执行完毕后,那么就会去复用第一个任务的线程,而不会重复创建新的线程
        private val mCachedThreadPool: ExecutorService by lazy {
            Executors.newCachedThreadPool { runnable ->
                return@newCachedThreadPool Thread(
                    runnable,
                    "#CachedThreadPool-${mSequenceGenerateNum.incrementAndGet()}-thread"
                )
            }
        }
    
        //2、创建好Runnable类线程对象和需要执行的任务
        private val mTask: Runnable = Runnable {
            Log.d(TAG, "执行任务: current thread is ${Thread.currentThread().name} ")
        }
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_cached_thread_pool)
            //3、向线程池提交任务: execute(runnable)
            mCachedThreadPool.execute(mTask)
        }
    
        override fun onDestroy() {
            super.onDestroy()
            //4、关闭线程池
            mCachedThreadPool.shutdown()
        }
    
        companion object {
            private const val TAG = "ScheduledThreadPool"
        }
    }

4、单线程化线程池(SingleThreadExecutor)

  • 类的继承关系

  • 源码定义

        //无需指定任何参数,内部采用FinalizableDelegatedExecutorService包装了一个只有1个核心线程的线程池,任务队列可以是无限制的。
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
        //指定threadFactory参数,内部采用FinalizableDelegatedExecutorService包装了一个只有1个核心线程的线程池,任务队列可以是无限的。
        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
  • 特点:

    只有一个核心线程(保证所有任务按照指定顺序在一个线程中执行,不需要处理线程同步的问题)

  • 应用场景:

    不支持并发,因为只有一个子线程,比如一些影响主线程的操作,适合多个任务按照顺序串行在单线程中执行

  • 使用例子:

    class SingleThreadExecutorActivity : AppCompatActivity() {
        private val mSequenceGenerateNum: AtomicInteger = AtomicInteger(0)
        //1、创建单线程化线程池
        private val mSingleThreadExecutor: ExecutorService by lazy {
            Executors.newSingleThreadExecutor{  runnable ->
                return@newSingleThreadExecutor Thread(runnable, "#SingleThreadExecutor-${mSequenceGenerateNum.incrementAndGet()}-thread")
            }
        }
        //2、创建好Runnable类线程对象和需要执行的任务
        private val mTask: Runnable by lazy {
            Runnable {
                Log.d(TAG, "执行任务: current thread is ${Thread.currentThread().name} ")
            }
        }
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_single_thread_executor)
            //3、提交线程任务
            mSingleThreadExecutor.submit(mTask)
        }
    
        override fun onDestroy() {
            super.onDestroy()
            //4、关闭线程池
            mSingleThreadExecutor.shutdown()
        }
    
        companion object {
            private const val TAG = "SingleThreadExecutor"
        }
    }

5、四种线程池的对比

线程池类型 池内 线程类型 池内 线程数量 处理方式 应用场景
定长线程池(newFixedThreadPool) 只有核心线程 固定数量,外部指定 1、核心线程处于空闲状态也不会被回收,除非线程被关闭
2、当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有空闲的线程。
3、任务队列无大小限制(超出任务会在任务队列中等待)
控制线程的最大并发数
定时线程池(newScheduleThreadPoo) 核心线程与非核心线程 1、核心线程固定
2、非核心线程 无限制
当非核心线程闲置时,则会被立即回收 执行定时/周期性任务
可缓存线程池(newCachedThreadPool) 只有非核心线程 非核心线程数不固定且无限制 1、优先利用闲置线程处理新的任务(复用线程)
2、无闲置的线程可用时,即创建新的线程(即 任何线程的任务到来都会立刻执行,不需要等待)
3、灵活回收空闲线程(具备超时机制=60s,即空闲60s才会回收,如果在60s内有任务到来可以直接复用,全部回收几乎不占用资源)
执行数量多,耗时少的线程任务
单线程化线程池(newSingleThreadPool) 只有核心线程 1个 1、保证所有任务都按照指定串行执行
2、不需要处理线程同步的问题
单线程(不适合并发但可能引起IO阻塞性及影响UI线程响应的操作,如数据库操作,文件操作等,适合多个任务串行排队在单线程执行)

八、ThreadPoolExecutor内部原理

九、ThreadPoolExecutor中4种拒绝任务策略

1、AbortPolicy策略

这是一个默认策略,如果我们构造线程池的时候不传相应的 handler 的话,那就会指定使用这个,
策略规则是:不管怎样, 丢弃新任务,直接抛出 RejectedExecutionException 异常

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        //直接抛出 RejectedExecutionException 异常
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

2、CallerRunsPolicy策略

策略规则是:只要线程池没有被关闭,那么由提交任务的线程所处线程自己来执行这个任务。比如是主线程提交的任务,那么就由主线程来执行这个任务

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {//如果当前线程池没有关闭
                r.run();//直接执行Runnable,相当于直接在提交任务当前线程执行任务
            }
        }
    }

3、DiscardPolicy策略

策略规则是:不做任何处理,直接忽略掉这个任务

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        //do nothing 不做处理,直接忽略
        }
    }

4、DiscardOldestPolicy策略

这个相对霸道一点,如果线程池没有被关闭的话
策略规则是:把此时队列队头的第一个任务(也就是即将要执行)直接扔掉,然后提交这个任务到等待队列中,插入到队尾

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {//若当前线程池未关闭
                e.getQueue().poll();//把当前任务队列中队头元素直接扔掉
                e.execute(r);//再把当前任务提交到线程池中
            }
        }
    }

十、ThreadPoolExecutor源码分析

1、解释几个参数和方法

在线程池中采用一个32位整数来存放线程的状态和当前线程池中的线程数,其中高3位用于存放线程池的状态,低29位表示线程数(2^29 - 1 = 536860911个线程数)

COUNT_BITS = Integer.SIZE - 3 = 32 - 3 = 29

  • 线程池容量CAPACITY二进制运算过程:

(1 << COUNT_BITS) - 1 = 0001 1111 1111 1111 1111 1111 1111 1111 => 线程允许最大的数量CAPACITY为2^29 - 1 = 536860911

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
1 = 0000 0000 0000 0000 0000 0000 0000 0001 
COUNT_BITS = 29 

1 << 29 => 0010 0000 0000 0000 0000 0000 0000 0000
(1 << 29) - 1 => 0001 1111 1111 1111 1111 1111 1111 1111 //(高3位000 为线程池状态)

2、线程池的5种状态

  • 1、线程池RUNNING状态二进制运算过程:

接收新的任务,处理等待任务队列中的任务

 private static final int RUNNING    = -1 << COUNT_BITS;
RUNNING = -1 << COUNT_BITS
-1 => 1111 1111 1111 1111 1111 1111 1111 1111
-1 << 29 => 1110 0000 0000 0000 0000 0000 0000 0000
=> 1110 0000 0000 0000 0000 0000 0000 0000 //state(高3位): 111  value(状态10进制值): -536870912
  • 2、线程池SHUTDOWN状态二进制运算过程

不接受新的任务,但是会继续处理等待队列中的任务

private static final int SHUTDOWN = 0 << COUNT_BITS;
SHUTDOWN = 0 << COUNT_BITS = 0
0 => 0000 0000 0000 0000 0000 0000 0000 0000 
0 << 29 => 0000 0000 0000 0000 0000 0000 0000 0000
=> 0000 0000 0000 0000 0000 0000 0000 0000 //state(高3位): 000 value(状态10进制值): 0
  • 3、线程池STOP状态二进制运算过程

不接受新的任务,不再处理等待队列中的任务,中断正在执行的任务线程

 private static final int STOP       =  1 << COUNT_BITS;
STOP   =  1 << COUNT_BITS
1 => 0000 0000 0000 0000 0000 0000 0000 0001 
1 << 29 => 0010 0000 0000 0000 0000 0000 0000 0000
=> 0010 0000 0000 0000 0000 0000 0000 0000//state(高3位): 001  value(状态10进制值): 536870912
  • 4、线程池TIDYING状态二进制运算过程

所有的任务都销毁了,workCount线程数为0;线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()

 private static final int TIDYING    =  2 << COUNT_BITS;
TIDYING   =  2 << COUNT_BITS
2 => 0000 0000 0000 0000 0000 0000 0000 0010 
2 << 29 => 0100 0000 0000 0000 0000 0000 0000 0000
=> 0100 0000 0000 0000 0000 0000 0000 0000//state(高3位): 010  value(状态10进制值): 1073741824 
  • 5、线程池TERMINATED状态二进制运算过程

terminated()钩子方法执行结束后,线程池就会变成这个状态

 private static final int TERMINATED =  3 << COUNT_BITS;
TERMINATED   =  3 << COUNT_BITS
3 => 0000 0000 0000 0000 0000 0000 0000 0011 
3 << 29 => 0110 0000 0000 0000 0000 0000 0000 0000 
=>0110 0000 0000 0000 0000 0000 0000 0000 //state(高3位): 011 value(状态10进制值): 1610612736 

注意:

RUNNING值为-536870912,SHUTDOWN值为0,其他的都比0大,所以等于0也就是SHUTDOWN的状态下不能提交新的任务,大于0的时候就连正在执行的任务也需要中断,如果值小于0的时候线程池状态就是RUNNING

3、线程池状态转化图

4、ThreadPoolExecutor构造器

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }


    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    //7个参数: corePoolSize核心线程数
    //maximumPoolSize最大线程数
    //keepAliveTime(非核心线程超时时间、若allowCoreThreadTimeOut为true,那么它也是核心线程的超时时间)
    //Unit(keepAliveTime单位)
    //workQueue(任务队列)
    //threadFactory(创建线程工厂类对象)
    //handler(RejectedExecutionHandler拒绝任务策略)
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

5、第一步,从execute入口方法开始,execute接收一个Runnable对象作为参数

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//ctl初始值为默认RUNNING状态值为-536870912
    private static int ctlOf(int rs, int wc) { return rs | wc; }//RUNNING | 0 = RUNNING; 
    private static int workerCountOf(int c) { return c & CAPACITY;}//workerCountOf(c) = RUNNING & CAPACITY => 

    1110 0000 0000 0000 0000 0000 0000 0000 
    &
    0001 1111 1111 1111 1111 1111 1111 1111
    =
    0000 0000 0000 0000 0000 0000 0000 0000
    //所以初始线程池的数量为0

    public void execute(Runnable command) {
        if (command == null) throw new NullPointerException();//任务为空,抛出空指针异常
        //通过AtomicInteger对象ctl拿到它的值,ctl初始值为默认RUNNING状态值为-536870912
        int c = ctl.get();
        //通过workerCountOf拿到线程池中线程数量,最开始默认线程池中线程数为0
        //----如果当前线程池线程数量 <(小于) 核心线程数corePoolSize----
        if (workerCountOf(c) < corePoolSize) {
            //执行addWorker方法,在内部创建一个新的核心线程来执行任务,当前任务command会作为这个线程的第一个任务(firstTask)执行,ture表示创建核心线程
            if (addWorker(command, true))
                return;//如果添加任务成功,那么这个执行方法就结束了,提交任务嘛,线程池已经接受这个任务了,说明它的职责也就结束了。addWorker返回false表示线程池不允许提交任务
            c = ctl.get();
        }

        //----如果当前线程池数量 >=(超过) 核心线程数corePoolSize, 会执行到此处---

        //当前线程处于RUNNING状态 &&(并且) 尝试offer任务排队成功(offer返回true表示未满,返回false表示队列满了)
        if (isRunning(c) && workQueue.offer(command)) {
            //尽管任务command已经offer排队成功,但是有可能在排队成功后就出现了线程池关闭情况,所以需要再次检查
            int recheck = ctl.get();
            //如果再次检查发现当前线程处于非RUNNING,并且成功地把当前这个任务从队列删掉,那么就拒绝当前任务
            if (!isRunning(recheck) && remove(command))
                reject(command);//拒绝当前任务
            //如果当前线程池线程的数量为0,也就是核心线程数corePoolSize为0,全部都是非核心线程时,那么就创建非核心线程,来执行队列中的任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果当前线程池数量 >=(超过) 核心线程数corePoolSize &&(并且) 队列满了,就创建非核心线程来执行任务command
        else if (!addWorker(command, false))
        //如果非核心线程创建失败,也就是当前线程池线程数超过最大线程数,就执行拒绝任务
            reject(command);
    }

6、第二步,再分析一个特别重要的方法addWorker(Runnable firstTask, boolean core)

7、第三步,内部类ThreadPoolExecutor.Worker实现

8、第四步,通过addWorker方法,创建了一个Worker对象,并且启动了一个新的线程来执行,线程启动后,Worker中的run就会被回调,因为Worker实现Runnable接口,创建Thread的时把自己作为Runnable参数传入来创建。

 /** Delegates main run loop to outer runWorker. */
   public void run() {
   ////注意: 这个run是执行在线程池中工作线程中,完成了线程的切换,然后委托调用它的外部类ThreadPoolExecutor中的runWorker(this)方法,并把Worker对象传递出去
       runWorker(this);
   }

9、第五步,执行runWorker(worker)方法,运行在执行工作线程环境中

10、第六步,继续看下getTask方法是如何获取任务

getTask获取任务主要有三种可能:

  • 1、当只有核心线程且当前线程数超过了corePoolSize核心线程数,核心线程不会被回收,getTask就会一直阻塞直到有任务返回,否则就会一直等待任务

  • 2、超时退出,如果keepAliveTime起作用的时候,在keepAliveTime时间内都没有任务,那么就会执行关闭

  • 3、如果触发以下条件直接返回null:

当前线程池中超过了maximumPoolSize最大线程数;

线程池处于SHUTDOWN状态,而且workQueue是空的,不再接收新的任务

线程池处于STOP状态,不接受新的任务,且还要终止执行workQueue中的任务

十一、总结

1、核心口诀:(4、4、5、7)

4: Executors类中4种内置线程池: newFixedThreadPool、newScheduleThreadPool、newCachedThreadPool、newSingleThreadPool

4: 4个拒绝策略: AbortPolicy、CallRunsPolicy、DiscardPolicy、DiscardOldestPolicy

5: 线程池5种状态: Running、Shutdown、Stop、Tidying、Terminated

7: 7个核心参数: corePoolSize(最大核心线程数)、maximumPoolSize(线程池最大线程数)、keepAliveTime(非核心线程超时时间、若allowCoreThreadTimeOut为true,那么它也是核心线程的超时时间)、Unit(keepAliveTime单位)、workQueue(任务队列)、threadFactory(创建线程工厂类对象)、handler(RejectedExecutionHandler拒绝任务策略)

2、Java线程池中有哪些关键属性?

在Java线程池中主要的核心实现类是ThreadPoolExecutor, 它的构造器关键参数主要6个:

  • 1、corePoolSize: 核心线程数,核心线程默认空闲状态不会被回收,除非设置allowCoreThreadTimeOut为true
  • 2、maximumPoolSize: 线程池允许的最大线程数,当超过该maximumPoolSize,新的任务就会被拒绝,而处于corePoolSize和maximumPoolSize之间的非核心线程在超过keepAliveTime就会被回收
  • 3、keepAliveTime: 默认是针对非核心线程空闲超时时间,超过该时间就会被回收,如果设置了allowCoreThreadTimeOut为true,该规则同样是适用于核心线程,最后会执行关闭这些线程的操作。
  • 4、workQueue: 任务队列,用于存放任务,如果当前线程数超过了coreThreadPool核心线程数,就会把任务添加到任务队列,线程池中的线程就会负责到队列中拉取任务。
  • 5、threadFactory: 创建新线程的工厂类
  • 6、RejectedExecutionHandler: 拒绝任务策略抽象接口,用于处理当线程池不能执行此任务时的情况,主要有四种策略实现: AbortPolicy(默认)、CallerRunPolicy、DiscardPolicy、DiscardOldestPolicy

3、说说线程池中的线程创建时机?

  • 1、如果当前线程数少于 corePoolSize,那么提交任务的时候创建一个新的线程,并由这个线程执行这个任务;
  • 2、如果当前线程数已经达到 corePoolSize,那么将提交的任务添加到队列中,等待线程池中的线程去队列中取任务;
  • 3、如果队列已满,那么创建新的线程来执行任务,需要保证池中的线程数不会超过 maximumPoolSize,如果此时线程数超过了 maximumPoolSize,那么执行拒绝策略。

4、Executors.newFixedThreadPool(…) 和 Executors.newCachedThreadPool() 构造出来的线程池有什么差别?

  • 1、newFixedThreadPool只有核心线程,且核心线程数固定,采用的是BlockingQueue任务队列,一般用于控制并发数场景
  • 2、而newCachedThreadPool()没有核心线程,只有非核心线程且线程数量无限制,采用的是SynchronousQueue同步任务队列,keepAliveTime空闲状态超时时间为60s,所以回收线程池几乎不占用系统资源,会充分利用空闲线程,一般用于大量耗时较少的任务。

5、任务执行过程中发生异常怎么处理?

如果某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其他任务。然后会启动一个新的线程来代替它。

6、什么时候会执行拒绝策略?

拒绝任务主要是两种情况下会执行拒绝任务操作

  • 1、线程池中的线程数大于等于corePoolSize时,此时任务需要把它加入到任务队列中,任务入队成功后,但是此时可能线程池状态为非RUNNING运行状态,此时就任务还没有出队列被执行,这时候需要把它从队列中remove成功后就执行reject(command)拒绝策略
  • 2、线程池中的线程数大于等于corePoolSize核心线程数,将任务加入到任务队列中,但是此时队列已经满了,就需要去创建非核心线程,但是此时线程数超过了maximumPoolSize最大线程数,就执行拒绝任务策略。

   转载规则


《Android线程池ThreadPoolExecutor源码解析》 mikyou 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
Java中反射源码解析 Java中反射源码解析
Java中反射源码解析一、反射原理阐述1、从应用层角度 在JVM类型的编程语言中,任何一个编译好的类都会生成class文件, 在被类加载器加载完后都会有一个java.lang.Class<T>这个类Class实例对应。所以每个类
2019-12-29
下一篇 
Android性能优化之内存管理 Android性能优化之内存管理
Android性能优化之内存管理一、内存分配说到Android的内存分配,就不得不提Java中的内存管理。Java程序在运行时将数据划分为若干不同数据区: 方法区(线程共享)、堆区-heap区(线程共享)、虚拟机栈(线程私有)、本地方法栈(
2019-12-27
  目录