1,前段時間換工作的時候,關於AsyncTask源碼這個點基本上大一點的公司都會問,所以今天就和大家一起來總結總結。本來早就想寫這篇文章的,當時寫《Android -- 從源碼解析Handle+Looper+MessageQueue機制》的時候就是想為這篇文章做鋪墊的,因為AsyncTask說里面還是使用的handle,所以先就寫了handle這一篇。記得15年底去美團面試的時候,面試官就問我既然存在handle為什么google還要出AsyncTask(畢竟底層還是用的handle+Executor),當時也是不太懂。只是簡單的說了下AsyncTask是對handle的封裝,可能更加優化,性能好之類的。所以今天就帶大家一起看一下它底層到底是怎么實現的。
2,分析+實例
在12年之前那時候還沒有一些xtuils、volley的第三方的網絡框架,也沒有asyncTask的出現,那時候要請求一個網絡數據,首先build request參數,然后由於請求網絡是耗時操作,所以你得有個Executer或者線程,然后enqueue后,通過線程去run你的請求,得到服務器數據后,callback回調給你的上層。
在沒有框架的年代,想要做一次請求,是萬分痛苦的,你需要自己管理線程切換,需要自己解析讀取數據,解析數據成對象,切換回主線程,回調給上層。
然后我們的AsyncTask順勢而生了,它不需要我們程序員再手動管理線程,動手寫回調之類,為了防止有些同學壓根都不知道這個類,所以這里我還是帶着大家從一些概念到實例,再到源碼。
public abstract class AsyncTask<Params, Progress, Result> { }
當我們使用asynctask的時候一般都會創建一個類繼承自它,且需要確定它的三個泛型,這里的三種泛型類型分別代表“啟動任務執行的輸入參數”、“后台任務執行的進度”、“后台計算結果的類型”。在特定場合下,並不是所有類型都被使用,如果沒有被使用,可以用Java.lang.Void類型代替。
在一般的get請求情況下我們第一個參數是傳遞的我們請求的地址,所以這里我們會傳遞一個String,而第二個參數是我們的后台執行的進度,如果你的需求需要實時的向用戶展示請求的進度的話這里就可以填寫Integer類型,否寫可以寫上Void,第三個參數是我們請求的結果,一般是byte[],String等,可以根據具體的需求。
再讓我們看看它里面的幾個重要的方法:
① onPreExecute():一般用來在執行后台任務前對UI做一些標記,例如dialog的show。 ② doInBackground(Params... params) :用於執行較為耗時的操作,此方法將接收輸入參數和返回計算結果。在執行過程中可以調用publishProgress(Progress... values)來更新進度信息。 ③ publishProgress(Progress... values):用來更新進度 ④ onProgressUpdate(Progress... values):在調用publishProgress(Progress... values)時,此方法被執行,直接將進度信息更新到UI組件上。 ⑤.onPostExecute(Result result):當后台操作結束時,此方法將會被調用,計算結果將做為參數傳遞到此方法中,將結果回調顯示到UI組件上。
這里需要注意幾個點:上面這五個方法都不是手動調用的,當你調用ascyntask.execute()方法之后上面的方法會自動調用、doInBackground是運行在子線程,不能進行ui操作。好了,大致的知識點都了解了,我們開始寫一個栗子。
先看一下效果:
讓我們直接來看一下自定義的MyAsyncTask的代碼:
MyAsyncTask.java
public class MyAsyncTask extends AsyncTask<String ,Integer,byte[]> { /** * 被調用后立即執行,一般用來在執行后台任務前對UI做一些標記 */ @Override protected void onPreExecute() { super.onPreExecute(); Log.i("wangjitao","onPreExecute"+Thread.currentThread().getName()); } /** * 在onPreExecute()完成后立即執行,用於執行較為費時的操作,此方法將接收輸入參數和返回計算結果。在執行過程中可以調用publishProgress(Progress... values)來更新進度信息。 * @param params * @return */ @Override protected byte[] doInBackground(String... params) { Log.i("wangjitao","doInBackground"+Thread.currentThread().getName()); //請求網絡數據 InputStream inputStream = null; HttpURLConnection httpURLConnection = null; try { URL url = new URL(params[0]); if (url != null) { httpURLConnection = (HttpURLConnection) url.openConnection(); // 設置連接網絡的超時時間 httpURLConnection.setRequestMethod("GET");//GET和POST必須全大寫 httpURLConnection.setConnectTimeout(10000);//連接的超時時間 httpURLConnection.setReadTimeout(5000);//讀數據的超時時間 // 表示設置本次http請求使用GET方式請求 httpURLConnection.setRequestMethod("GET"); int responseCode = httpURLConnection.getResponseCode(); if (responseCode == 200) { // 從服務器獲得一個輸入流 InputStream is = httpURLConnection.getInputStream(); long total = httpURLConnection.getContentLength(); int count = 0; ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; int len=0; while((len=is.read(buffer))!=-1){ baos.write(buffer, 0, len); count += len; publishProgress((int) ((count / (float) total) * 100)); //為了演示進度,休眠500毫秒 Thread.sleep(500); } is.close(); byte[] datas = baos.toByteArray(); baos.close(); return datas; } } } catch (Exception e) { } return null; } /** * 當前進度的更新 * @param values */ @Override protected void onProgressUpdate(Integer... values) { Log.i("wangjitao","onProgressUpdate"+Thread.currentThread().getName()); mDialog.setProgress(values[0]); } /** * 當后台操作結束時,此方法將會被調用,計算結果將做為參數傳遞到此方法中,直接將結果顯示到UI組件上。 * @param bytes */ @Override protected void onPostExecute(byte[] bytes) { Log.i("wangjitao","onPostExecute"+Thread.currentThread().getName()); mDialog.dismiss(); mImageView.setImageBitmap(BitmapFactory.decodeByteArray(bytes, 0, bytes.length)); } }
這里在點擊button之后我調用了new MyAsyncTask().execute(url);,然后再MyAsyncTask的每個方法里面添加上了當前線程的log,想看一下doInBackground是不是像我們網上說的運行在子線程,這里注意一下,我們請求的是網絡數據,記得在清單文件加上網絡權限,打印結果如下:
08-16 05:36:24.978 19453-19453/com.ysten.anysctasksource I/wangjitao: onPreExecute:main 08-16 05:36:24.979 19453-23587/com.ysten.anysctasksource I/wangjitao: doInBackground:AsyncTask #1 08-16 05:36:28.573 19453-19453/com.ysten.anysctasksource I/wangjitao: onProgressUpdate:main 08-16 05:36:29.074 19453-19453/com.ysten.anysctasksource I/wangjitao: onProgressUpdate:main 08-16 05:36:29.574 19453-19453/com.ysten.anysctasksource I/wangjitao: onProgressUpdate:main 08-16 05:36:30.075 19453-19453/com.ysten.anysctasksource I/wangjitao: onProgressUpdate:main 08-16 05:36:30.575 19453-19453/com.ysten.anysctasksource I/wangjitao: onProgressUpdate:main 08-16 05:36:31.086 19453-19453/com.ysten.anysctasksource I/wangjitao: onProgressUpdate:main 08-16 05:36:31.588 19453-19453/com.ysten.anysctasksource I/wangjitao: onPostExecute:main
可以看到doInBackground的確運行在子線程中,過一下我們從源碼里面來驗證一下。
3,源碼分析
我們知道啟動我們整個任務的就是我們new MyAsyncTask().execute(url);方法,所以我們首先來看一下execute()方法
@MainThread public final AsyncTask<Params, Progress, Result> execute(Params... params) { return executeOnExecutor(sDefaultExecutor, params); } @MainThread public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) { if (mStatus != Status.PENDING) { switch (mStatus) { case RUNNING: throw new IllegalStateException("Cannot execute task:" + " the task is already running."); case FINISHED: throw new IllegalStateException("Cannot execute task:" + " the task has already been executed " + "(a task can be executed only once)"); } } mStatus = Status.RUNNING; onPreExecute(); mWorker.mParams = params; exec.execute(mFuture); return this; }
第3行:我們知道execute方法是調用本類的executeOnExecutor。
第10-20行:用來判斷當前AsyncTask的狀態,一共存在三種狀態 :PENDING(就緒)、RUNNING(運行)、FINDISH(完成),保證每次都是第一次運行此異步任務,不然就拋異常
第22行:將當前的運行狀態切換至RUNNING狀態
第24行:執行了我們的onPreExecute() ,注意一下,這時候我們的第一個准備操作的方法被執行了(所以一般我們在這個方法做一些ui的准備操作)
第26行:將我們的params復制成員變量mWorker對象的mParams上(有同學說這里我還不知道mWorker,先不要慌,繼續往下看)
第27行:exec.execute(mFuture);這行代碼我們更加一臉懵逼,mFuture、exec分別代表什么對象呢?exec.execute()是用來干什么的呢?
ok,看完 上述代碼我們一定對mWorker、mFuture、exec這些成員變量有所疑惑吧,沒事我們來一點點的來看。
首先來看一下我們的mWorker究竟是什么,源碼如下:
private final WorkerRunnable<Params, Result> mWorker; private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> { Params[] mParams; } public interface Callable<V> { V call() throws Exception; } mWorker = new WorkerRunnable<Params, Result>() { public Result call() throws Exception { mTaskInvoked.set(true); Result result = null; try { Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //noinspection unchecked result = doInBackground(mParams); Binder.flushPendingCommands(); } catch (Throwable tr) { mCancelled.set(true); throw tr; } finally { postResult(result); } return result; } };
第4-10行:可以看到我們的mWorker是Callable的子類,包含一個call方法和一個變量mParams。
這里要補充一下,mWorker是在構造函數中初始化的,因為是一個抽象類,在這里new了一個實現類,實現了call方法
第14-27行:首先將mTaskInvoked的value設置成true(這里要留心一下這個操作,后面還有地方對mTaskInvoked進行判斷的),然后Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);設置子線程優先執行,然后就執行到了doInBackground(),看到沒,這里就到了我們的第二個方法,這里還有一個疑問,我們上面知道doInBackground是執行在子線程中的,那怎么執行在子線程中呢(我們帶着這個問題),先繼續往下看。
當運行到finally代碼塊中調用的是 postResult(result);我們接下來看看postResult()的具體代碼:
private Result postResult(Result result) { @SuppressWarnings("unchecked") Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT, new AsyncTaskResult<Result>(this, result)); message.sendToTarget(); return result; } @SuppressWarnings({"RawUseOfParameterizedType"}) private static class AsyncTaskResult<Data> { final AsyncTask mTask; final Data[] mData; AsyncTaskResult(AsyncTask task, Data... data) { mTask = task; mData = data; } }
第3-5行:很熟悉沒?了解我們的異步消息handle的,都知道這是干什么的而 AsyncTaskResult類就是把當前從網絡下請求的結果數據result保存到data中
看到這,我相信大家肯定會想到,在某處肯定存在一個handler,且重寫了handleMessage方法等待消息的傳入
private static class InternalHandler extends Handler { public InternalHandler() { super(Looper.getMainLooper()); } @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"}) @Override public void handleMessage(Message msg) { AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj; switch (msg.what) { case MESSAGE_POST_RESULT: // There is only one result result.mTask.finish(result.mData[0]); break; case MESSAGE_POST_PROGRESS: result.mTask.onProgressUpdate(result.mData); break; } } }
果然,在handleMessage中我們的MESSAGE_POST_RESULT之后時調用的本身的finish()方法(這里也請大家留心一下MESSAGE_POST_PROGRESS這個消息,后面也會用到),那么讓我們再看看finish里面的具體代碼吧
private void finish(Result result) { if (isCancelled()) { onCancelled(result); } else { onPostExecute(result); } mStatus = Status.FINISHED; }
首先判斷當前的asynctask是否被取消,若果沒取消則執行onPostExecute(result),這時候數據已經回調了,到這里我們就差不多執行完了幾個重要的方法了,然后再將asynctask的執行狀態切換到FINISH狀態。
到這里我們的mWorker的工作流程全部了解了,繼續往下看到我們mFuture這個變量
private final FutureTask<Result> mFuture; mFuture = new FutureTask<Result>(mWorker) { @Override protected void done() { try { postResultIfNotInvoked(get()); } catch (InterruptedException e) { android.util.Log.w(LOG_TAG, e); } catch (ExecutionException e) { throw new RuntimeException("An error occurred while executing doInBackground()", e.getCause()); } catch (CancellationException e) { postResultIfNotInvoked(null); } } };
mFuture這個變量也是初始化在我們的構造函數里面,首先我們不去看FutureTask這個類中的具體代碼包含的方法和變量(由於代碼有點多,后面的話單獨的給大家分析一下),我們先繼續往下看,mFuture的初始化是將將mWorker作為參數,並重寫其done()方法。
done方法中具體的時調用postResultIfNotInvoked()方法,而get()方法里面是獲取的我們的Result泛型,拿到的是我們mWorker.call的返回值,看一下具體的代碼
private void postResultIfNotInvoked(Result result) { final boolean wasTaskInvoked = mTaskInvoked.get(); if (!wasTaskInvoked) { postResult(result); } }
如果mTaskInvoked不為true,則執行postResult;但是在mWorker初始化時就已經將mTaskInvoked為true(我在上面call方法里面提醒過大家的),所以一般這個postResult執行不到。
ok,到這里我們我們基本上都了解都mWorker、mFuture的意思了,接下來看看我們的exec這個變量的含義吧
public final AsyncTask<Params, Progress, Result> execute(Params... params) { return executeOnExecutor(sDefaultExecutor, params); } private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR; public static final Executor SERIAL_EXECUTOR = new SerialExecutor(); private static class SerialExecutor implements Executor { final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>(); Runnable mActive; public synchronized void execute(final Runnable r) { mTasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (mActive == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((mActive = mTasks.poll()) != null) { THREAD_POOL_EXECUTOR.execute(mActive); } } }
第1-3行:從上面的代碼我們可以知道,exec為executeOnExecutor(sDefaultExecutor, params)中的sDefaultExecutor
第5-7行:sDefaultExecutor實際上是一個SerialExecutor對象
第9-26行:這里我們可以看到SerialExecutor里面內部維持一個任務隊列,在execute方法中將runnable放入mTasks隊尾,然后判斷當前mActive是否為空,如果不為空在調用scheduleNext()方法;
第28-33行:取出任務隊列中的隊首任務,如果不為null則賦值給mActive對象並傳入THREAD_POOL_EXECUTOR進行執行。
所以到這里我們再來看看THREAD_POOL_EXECUTOR是一個什么鬼!
public static final Executor THREAD_POOL_EXECUTOR; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory); threadPoolExecutor.allowCoreThreadTimeOut(true); THREAD_POOL_EXECUTOR = threadPoolExecutor; private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4)); private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE_SECONDS = 30;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
可以看到THREAD_POOL_EXECUTOR其實就是一個線程池
第9-12行:是一些線程池的配置,在3.0以前最大線程並發數MAXIMUM_POOL_SIZE是一個固定值128,現在都是根據當前cpu合數來動態設置的,例如現在的床鋪是四核的,所以MAXIMUM_POOL_SIZE為9,並且將阻塞線程由以前的10個變成現在的128個(記得前不久面試的時候面試官就問道為什么會設置128這個數值為什么不是129或者140,到現在我還是不知道怎么回答這個問題),那么是不是意味着我們並發的線程數超過137個之后,我們的程序就會拋異常,其實不是這樣的,我們之前的SerialExecutor 類中的execute里面的代碼
public synchronized void execute(final Runnable r) { mTasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (mActive == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((mActive = mTasks.poll()) != null) { THREAD_POOL_EXECUTOR.execute(mActive); } }
當存在10個任務的時候,第一個任務進來,mActive為空執行scheduleNext,在scheduleNext方法中取出線程頭部添加到線程池中,然后復制給mActive,當第二個任務進來時,mActive不為空,也就是說不執行scheduleNext()方法,所以只有等到第一個任務run方法執行完之后調用finally中的scheduleNext()才會執行下一個任務,所以來說其實還是單線程線性執行,一個接一個。
ok,到這里我們基本上完成了對源碼的解讀,但是我們還是有幾個疑問,我們的onProgressUpdate()和publishProgress()怎么沒有執行啊,onProgressUpdate方法只有在調用publishProgress()之后才執行,所以讓我們來看看publishProgress()的源碼
@WorkerThread protected final void publishProgress(Progress... values) { if (!isCancelled()) { getHandler().obtainMessage(MESSAGE_POST_PROGRESS, new AsyncTaskResult<Progress>(this, values)).sendToTarget(); } }
很熟悉,又發現了我們的handle,繼續找到我們的消息接受的地方
private static class InternalHandler extends Handler { public InternalHandler() { super(Looper.getMainLooper()); } @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"}) @Override public void handleMessage(Message msg) { AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj; switch (msg.what) { case MESSAGE_POST_RESULT: // There is only one result result.mTask.finish(result.mData[0]); break; case MESSAGE_POST_PROGRESS: result.mTask.onProgressUpdate(result.mData); break; } } }
這不是我們剛剛看過的代碼,沒錯,上面我叫大家留意過的MESSAGE_POST_PROGRESS就是這個用於更新我們進度的,在里面直接調用onProgressUpdate()方法。
ok,我們現在還是有一個疑問,就是為什么說doInBackground運行在子線程中
我們進過上面的源碼分析指導doInBackground是在mWorker的call方法中調用的,所以我們現在只需要在哪里調用了mWorker.call()代碼就行
mWorker = new WorkerRunnable<Params, Result>() { public Result call() throws Exception { mTaskInvoked.set(true); Result result = null; try { Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //noinspection unchecked result = doInBackground(mParams); Binder.flushPendingCommands(); } catch (Throwable tr) { mCancelled.set(true); throw tr; } finally { postResult(result); } return result; } };
由於我們知道mWorker是以參數傳遞到mFuture中的,所以我們還是要看FutureTask這個類的源碼
public class FutureTask<V> implements RunnableFuture<V> { public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public void run() { if (state != NEW || !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } }
這里我只展示除了重要的幾個方法,我們知道mFuture是以參數傳遞到SerialExecutor類中的execute方法中
public synchronized void execute(final Runnable r) { mTasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (mActive == null) { scheduleNext(); } }
所以這里new了一個runnable創建了一個子線程,並調用mFuture的run方法,而在mFuture的run方法中有一行result = c.call();這里的c就是我們的mWorker,所以這也就解釋了我們的doInBackground是運行在子線程中。
ok,到這里我們的問題解決的差不多了,但是有一些公司喜歡問一下3.0之前的asynctask 的缺陷問題,在這里給大家找了一寫,就直接貼出來了,夠大家應付面試,具體是怎么產生的就大家自己去研究了,篇幅有點長,就不過多介紹了。
如果現在大家去面試,被問到AsyncTask的缺陷,可以分為兩個部分說,在3.0以前,最大支持128個線程的並發,10個任務的等待。在3.0以后,無論有多少任務,都會在其內部單線程執行;
這是GitHub代碼,有需要的同學可以去下載一下.在寫線程池這一塊感覺自己不是很熟悉,所以下一篇打算來總結總結線程池。