Android中AsyncTask源码解析
一、AsyncTask描述
AsyncTask是一种轻量级的异步任务处理类,它可以在线程池中执行后台任务,并且能把执行进度以及最终在线程池中执行异步任务结果传递主线程进行UI刷新,也就是我们常说的线程切换。这是因为它内部采用了Thread+Handler的消息机制实现,实现线程的切换。通过AsyncTask可以更加方便地执行异步任务,并且容易切换到UI线程更新UI,但是AsyncTask不适合特别耗时且大量并发的异步任务。因为线程池支持的并发数是2~4个,且任务队列长度固定128;任务数一旦超过会出现后续任务拒绝抛出异常。
二、AsyncTask类的介绍
AsyncTask<Params, Progress, Result>
是一个抽象泛型类,它主要有三个泛型参数,Params, Progress, Result
.Params
表示传入参数的类型,Progress
表示后台任务执行的进度的类型,Result
就是后台任务返回结果的类型。
三、AsyncTask类5个核心方法介绍
1、onPreExecute方法:
在主线程执行,在异步任务执行前触发,一般用于做一些前期初始化准备工作
public class HttpTask extends AsyncTask<String, Integer, String> {
@Override
protected void onPreExecute() {//执行环境:主线程
super.onPreExecute();
}
}
2、doInBackground(params)方法
在线程池中执行,用于执行异步任务,params表示异步任务执行的输入参数;注意:在doInBackground内部调用publishProgress(progress)方法,然后在外部主线程重写的onProgressUpdate方法才能被回调。(实际上很简单就是publishProgress内部通过Handler在线程池中发送消息,然后在主线程中Looper获取消息进行分发最后到handleMessage,在handleMessage方法中就调用onProgressUpdate函数)
public class HttpTask extends AsyncTask<String, Integer, String> {
@Override
protected String doInBackground(String... params) {//执行环境:工作线程(子线程)
return null;
}
}
3、onProgressUpdate方法
该方法用与获取后台任务执行的进度,所以该方法得以回调必须在doInBackground中触发publishProgress才可以。
public class HttpTask extends AsyncTask<String, Integer, String> {
@Override
protected void onProgressUpdate(Integer... values) {//执行环境:主线程
super.onProgressUpdate(values);
}
}
4、onPostExecute方法
该方法就是用于返回后台任务执行的结果,该方法触发是在后台任务执行完毕后,拿到返回的结果,并且当前任务没有被cancell掉,就会触发回调onPostExecute方法。当然onPostExecute线程切换也是通过Handler在线程池发送消息,然后主线程回调handleMessage触发相应的onPostExecute方法
public class HttpTask extends AsyncTask<String, Integer, String> {
@Override
protected void onPostExecute(String s) {//执行环境:主线程
super.onPostExecute(s);
}
}
5、onCancelled方法
该方法很简单就是当后台任务被cancel调后,在执行最后的handleMessage方法中的finish中判断如果是isCancelled就触发onCancelled方法不会去触发onPostExecuted
public class HttpTask extends AsyncTask<String, Integer, String> {
@Override
protected void onCancelled() {//执行环境:主线程
super.onCancelled();
}
}
四、AsyncTask基本使用
public class HttpTask extends AsyncTask<String, Integer, String> {
private OkHttpClient mOkHttpClient;
@Override
protected void onPreExecute() {//执行环境:主线程, 主要是做一些执行之前初始化工作
printThreadLog("onPreExecute");
mOkHttpClient = new OkHttpClient();
}
@Override
protected String doInBackground(String... params) {//执行环境:工作线程(子线程)
printThreadLog("doInBackground");
publishProgress(100);//注意:只有在doInBackground方法中调用了publishProgress方法,外部的onProgressUpdate才会被回调
if (params.length == 0) throw new IllegalArgumentException("params is not allowed empty");
Request request = new Request.Builder()
.url(params[0] != null ? params[0] : "https://www.google.com")
.method("GET", null)
.build();
try {
Response response = mOkHttpClient.newCall(request).execute();
if (response.body() != null) {
return response.body().string();
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
protected void onProgressUpdate(Integer... values) {//执行环境:主线程
printThreadLog("onProgressUpdate");
}
@Override
protected void onPostExecute(String result) {//执行环境:主线程
printThreadLog("onPostExecute: result is " + result);
}
@Override
protected void onCancelled() {//执行环境:主线程
printThreadLog("onCancelled");
}
private void printThreadLog(String msg) {
Log.d("HttpTask", "current_thread: " + Thread.currentThread().getName() + " msg: " + msg);
}
}
//调用
HttpTask httpTask = new HttpTask().execute("https://www.shanbay.com");
五、AsyncTask原理介绍
AsyncTask原理: 实际上两个线程池+Handler,其中
SerialExecutor
是为了调度多个任务,让多个任务串行排列;ThreadPoolExecutor
则是实现真正异步请求以及工作子线程的调度;而Handler
则用于异步通信(线程间切换).
六、AsyncTask源码分析
1、首先,来分析下AsyncTask两个线程池配置参数, ThreadPoolExecutor是通过static代码块初始化的;而SerialExecutor则是一个static常量初始化的。
//初始化ThreadPoolExecutor线程池配置
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();//获取当前CPU核数
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));//核心线程数控制在2~4个之间
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;//最大线程数等于CPU核数 * 2 + 1
private static final int KEEP_ALIVE_SECONDS = 30;//非核心线程空闲超时时间是30s
//线程池创建线程的工厂对象sThreadFactory
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
//任务队列sPoolWorkQueue,长度固定为128
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
public static final Executor THREAD_POOL_EXECUTOR;
static {
//初始化ThreadPoolExecutor对象
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
sPoolWorkQueue,
sThreadFactory);//默认的任务拒绝策略是AbortPolicy
threadPoolExecutor.allowCoreThreadTimeOut(true);//允许核心线程超时,也就是说keepAliveTime为30s空闲超时时间对核心线程也适用。
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
//初始化SerialExecutor线程池
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
2、然后,进入AsyncTask构造器分析
public AsyncTask() {
this((Looper) null);//默认Looper为null,通过looper构建Handler
}
public AsyncTask(@Nullable Handler handler) {//传入handler,取出handler
this(handler != null ? handler.getLooper() : null);
}
public AsyncTask(@Nullable Looper callbackLooper) {
//如果Looper为null或是MainLooper,那么就直接获取MainHandler,默认就是执行主线程中异步任务,实际上AsyncTask还可以执行其他工作线程中的异步任务,比如工作线程A中执行异步任务,只要外部传入的Looper是工作线程A中即可。
mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
? getMainHandler()
: new Handler(callbackLooper);
//创建一个WorkerRunnable对象,它实现了Callable接口,重写了一个带返回值的call方法
mWorker = new WorkerRunnable<Params, Result>() {
//重写了带返回值的call方法,返回值类型就是Result泛型类型,这个call方法最终会被线程run方法触发,所以它的执行环境是子线程或工作线程
public Result call() throws Exception {
mTaskInvoked.set(true);//标识当前任务已经执行过了
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);//设置后台执行工作线程优先级,也就是标准后台任务线程优先级10
//调用doInBackground方法,并返回执行结果,doInBackground是一个工作线程,然后在工作线程执行了一个同步的请求,不会阻塞主线程,然后拿到同步请求的结果,result.
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
//如果出现异常,表示任务已被自动取消
mCancelled.set(true);
throw tr;
} finally {
//最后通过postResult方法将执行任务结果传递出去
postResult(result);
}
return result;
}
};
//创建FutureTask来包装刚创建的mWorker(Callable对象),我们知道线程池可支持直接提交一个FutureTask任务,FutureTask实现了RunnableFuture接口,有一个run抽象方法,FutureTask任务在线程池中线程Thread的run方法执行,会触发FutureTask的run方法,然后在它的run方法中会触发mWorker(Callable)的call方法,以致于最后会执行doInBackground方法
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
3、进入AsyncTask中的execute方法
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);//调用executeOnExecutor方法,如果没有单独指定默认线程池,就使用默认的线程池sDefaultExecutor也就是默认构造的SerialExecutor,它主要的任务是将异步任务串行调度执行,所以AsyncTask默认启动是串行的。
}
// executeOnExecutor
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
//mStatus默认是Status.PENDING,如果有任务进入就切换成RUNNING状态,也说明同一个任务在每执行完之前,不能同时触发执行多次,否则会抛出异常
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
//切换状态为RUNNING
mStatus = Status.RUNNING;
//触发回调onPreExecute方法,此时执行环境是主线程
onPreExecute();
mWorker.mParams = params;//给mWorker初始化mParams,用于call方法doInBackground方法回调参数
exec.execute(mFuture);//通过调用SerialExecutor的execute方法,将FutureTask提交到任务调度器中。
return this;
}
4、进入SerialExecutor中的execute(mFuture)方法
private static class SerialExecutor implements Executor {
//SerialExecutor内部创建了一个ArrayDeque作为任务队列
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
//传入的FutureTask本质也是一个Runnable
//然后再使用Runnable包装一个FutureTask,最后调用offer方法将最终包装的Runnable任务插入到mTask任务队列中
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();//如果当前任务中run执行完毕后,队列中还有任务的话,就通过finally中的scheduleNext方法串行调度下一个任务
} finally {
scheduleNext();
}
}
});
//如果首次进入,mActive任务为空,就调用scheduleNext开始第一次任务调度
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
//从任务队列中取出任务,并初始化mActive任务,如果任务不为空就调用execute把mActive任务提交到ThreadPoolExecutor中执行
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
5、进入ThreadPoolExecutor中的execute(Runnable)方法,实际上就是进入线程池执行异步任务的逻辑流程了。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//ctl初始值为默认RUNNING状态值为-536870912
private static int ctlOf(int rs, int wc) { return rs | wc; }//RUNNING | 0 = RUNNING;
private static int workerCountOf(int c) { return c & CAPACITY;}//workerCountOf(c) = RUNNING & CAPACITY =>
1110 0000 0000 0000 0000 0000 0000 0000
&
0001 1111 1111 1111 1111 1111 1111 1111
=
0000 0000 0000 0000 0000 0000 0000 0000
//所以初始线程池的数量为0
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();//任务为空,抛出空指针异常
//通过AtomicInteger对象ctl拿到它的值,ctl初始值为默认RUNNING状态值为-536870912
int c = ctl.get();
//通过workerCountOf拿到线程池中线程数量,最开始默认线程池中线程数为0
//----如果当前线程池线程数量 <(小于) 核心线程数corePoolSize----
if (workerCountOf(c) < corePoolSize) {
//执行addWorker方法,在内部创建一个新的核心线程来执行任务,当前任务command会作为这个线程的第一个任务(firstTask)执行,ture表示创建核心线程
if (addWorker(command, true))
return;//如果添加任务成功,那么这个执行方法就结束了,提交任务嘛,线程池已经接受这个任务了,说明它的职责也就结束了。addWorker返回false表示线程池不允许提交任务
c = ctl.get();
}
//----如果当前线程池数量 >=(超过) 核心线程数corePoolSize, 会执行到此处---
//当前线程处于RUNNING状态 &&(并且) 尝试offer任务排队成功(offer返回true表示未满,返回false表示队列满了)
if (isRunning(c) && workQueue.offer(command)) {
//尽管任务command已经offer排队成功,但是有可能在排队成功后就出现了线程池关闭情况,所以需要再次检查
int recheck = ctl.get();
//如果再次检查发现当前线程处于非RUNNING,并且成功地把当前这个任务从队列删掉,那么就拒绝当前任务
if (!isRunning(recheck) && remove(command))
reject(command);//拒绝当前任务
//如果当前线程池线程的数量为0,也就是核心线程数corePoolSize为0,全部都是非核心线程时,那么就创建非核心线程,来执行队列中的任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果当前线程池数量 >=(超过) 核心线程数corePoolSize &&(并且) 队列满了,就创建非核心线程来执行任务command
else if (!addWorker(command, false))
//如果非核心线程创建失败,也就是当前线程池线程数超过最大线程数,就执行拒绝任务
reject(command);
}
6、然后进入ThreadPoolExecutor中的addWorker(Runnable)方法
7、进入Worker的构造器
8、通过addWorker方法,创建了一个Worker对象,并且启动了一个新的线程来执行,线程启动后,Worker中的run就会被回调,因为Worker实现Runnable接口,创建Thread的时把自己作为Runnable参数传入来创建。
/** Delegates main run loop to outer runWorker. */
public void run() {
////注意: 这个run是执行在线程池中工作线程中,完成了线程的切换,然后委托调用它的外部类ThreadPoolExecutor中的runWorker(this)方法,并把Worker对象传递出去
runWorker(this);
}
9、Worker中的Thread线程启动后就会调用Worker中的run方法,然后执行runWorker(worker)方法,运行在执行工作线程环境中
10、runWorker中的task.run方法执行,会触发SerialExecutor中的Runnable中run方法。注意:执行环境为工作线程
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
//此Runnable中的run方法会执行,执行环境为工作线程
try {
r.run();//然后在run方法触发外部提交的mFuture(FutureTask)中的run方法
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
11、触发FutureTask中的回调方法run
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();//这里会触发c(WorkRunable)中的call方法,然后在call方法中调用doInBackground方法,那么doInBackground切换到工作线程中
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
12、回调到WorkRunnable中的call方法
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
//调用doInBackground方法,此时doInBackground处于工作线程中,不是主线程
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
13、回调到AsyncTask中的doInBackground方法(执行环境为工作线程)
@Override
protected String doInBackground(String... params) {//执行环境:工作线程(子线程)
printThreadLog("doInBackground");
publishProgress(100);//注意:只有在doInBackground方法中调用了publishProgress方法,外部的onProgressUpdate才会被回调
if (params.length == 0) throw new IllegalArgumentException("params is not allowed empty");
Request request = new Request.Builder()
.url(params[0] != null ? params[0] : "https://www.google.com")
.method("GET", null)
.build();
try {
//在工作线程直接执行OkHttp的同步请求,相对于主线程而言就是异步请求了
Response response = mOkHttpClient.newCall(request).execute();
if (response.body() != null) {
//最后返回同步请求的结果
return response.body().string();
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
14、再次回到WorkRunnable中的call方法,doInBackground执行完毕后,拿到了同步请求的结果,返回了最终任务结果,并通过postResult方法result返回
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
result = doInBackground(mParams);//拿到了doInBackground同步请求数据结果result后
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);//最后调用postResult方法将数据传递出去
}
return result;
}
};
15、进入postResult方法
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
//由于postResult还是处于另一个工作线程中,需要通过getHandler拿到一个Handler对象来发送消息,把最终的结果和当前的AsyncTask对象包装成一个AsyncTaskResult,放入Message传递到另一个线程(一般默认为主线程),从而实现线程的切换
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
private Handler getHandler() {
return mHandler;
}
public AsyncTask(@Nullable Looper callbackLooper) {
mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
? getMainHandler()
: new Handler(callbackLooper);
...
}
private static Handler getMainHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler(Looper.getMainLooper());
}
return sHandler;
}
}
16、在Main Looper中的loop函数中的MessageQueue调用next方法取到消息后,交于InternalHandler分发和处理消息,跳入到InternalHandler中的handleMessage方法(此时已经切换到了主线程)
//AsyncTaskResult由AsyncTask实例和Data不定长数组组成
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
//此时已经完成了线程切换,默认是切换到主线程中,取出消息中的数据是一个AsyncTaskResult,
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// 取出AsyncTaskResult中的AsyncTask实例,并调用finish方法,传入最终结果result.mData[0]
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
17、最后,进入AsyncTask中的finish方法
private void finish(Result result) {
if (isCancelled()) {//如果任务已经被cancelled了,直接回调onCancelled方法
onCancelled(result);
} else {//否则正常执行,回调最终的onPostExecute方法,并把最终的Result回调出去。
onPostExecute(result);
}
mStatus = Status.FINISHED;
}