Android多線程分析之四:MessageQueue的實現
在前面兩篇文章《Android多線程分析之二:Thread的實現》,《Android多線程分析之三:Handler,Looper的實現》中分別介紹了 Thread 的創建,運行,銷毀的過程以及 Thread與 Handler,Looper 之間的關聯:Thread 在其 run() 方法中創建和運行消息處理循環 Looper,而 Looper::loop() 方法不斷地從 MessageQueue 中獲取消息,並由 Handler 分發處理該消息。接下來就來介紹 MessageQueue 的運作機制,MessageQueue。
參考源碼:
android/framework/base/core/java/android/os/MessageQueue.java android/framework/base/core/java/android/os/Message.java android/frameworks/base/core/jni/android_os_MessageQueue.h android/frameworks/base/core/jni/android_os_MessageQueue.cpp
先來看 MessageQueue 的構造函數以及重要的成員變量:
// True if the message queue can be quit. private final boolean mQuitAllowed; private int mPtr; // used by native code Message mMessages; private boolean mQuiting; // Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout. private boolean mBlocked;
mQuitAllowed: 其含義與 Looper.prepare(boolean quitAllowed) 中參數含義一直,是否允許中止;
mPtr:Android MessageQueue 是通過調用 C++ native MessageQueue 實現的,這個 mPtr 就是指向 native MessageQueue;
mMessages:Message 是鏈表結構的,因此這個變量就代表 Message 鏈表;
mQuiting:是否終止了;
mBlocked:是否正在等待被激活以獲取消息;
MessageQueue 的構造函數很簡單:
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; nativeInit(); }
它通過轉調 native 方法 nativeInit() 實現的,后者是定義在 android_os_MessageQueue.cpp 中:
static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return; } nativeMessageQueue->incStrong(env); android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue); } static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj, NativeMessageQueue* nativeMessageQueue) { env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr, reinterpret_cast<jint>(nativeMessageQueue)); }
nativeInit() 方法創建 NativeMessageQueue 對象,並將這個對象的指針復制給 Android MessageQueue 的 mPtr。NativeMessageQueue 的定義如下:
class MessageQueue : public RefBase { public: /* Gets the message queue's looper. */ inline sp<Looper> getLooper() const { return mLooper; } bool raiseAndClearException(JNIEnv* env, const char* msg); virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj) = 0; protected: MessageQueue(); virtual ~MessageQueue(); protected: sp<Looper> mLooper; }; class NativeMessageQueue : public MessageQueue { public: NativeMessageQueue(); virtual ~NativeMessageQueue(); virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj); void pollOnce(JNIEnv* env, int timeoutMillis); void wake(); private: bool mInCallback; jthrowable mExceptionObj; };
其中值得關注的是 NativeMessageQueue 的構造以及pollOnce,wake 兩個方法,它們是Java MessageQueue 中 nativePollOnce 和 nativeWake 的 native 方法:
NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } } void NativeMessageQueue::pollOnce(JNIEnv* env, int timeoutMillis) { mInCallback = true; mLooper->pollOnce(timeoutMillis); mInCallback = false; } void NativeMessageQueue::wake() { mLooper->wake(); }
在 NativeMessageQueue 的構造函數中,會獲取當前線程的 Looper(注意這是 C++ Looper,定義在frameworks/native/libs/utils/Looper.h 中),如果當前線程還沒有 Looper,就創建一個,並保存在線程的 TLS 中。pollOnce 和 wake 最終都是通過 Linux 的 epoll 模型來實現的。pollOnce() 通過等待被激活,然后從消息隊列中獲取消息;wake() 則是激活處於等待狀態的消息隊列,通知它有消息到達了。這是典型的生產者-消費者模型。
對於Android MessageQueue 來說,其主要的工作就是:接收投遞進來的消息,獲取下一個需要處理的消息。這兩個功能是通過 enqueueMessage() 和 next() 方法實現的。next() 在前一篇文章介紹 Looper.loop() 時提到過。
在分析這兩個函數之前,先來介紹一下 Message:前面說過 Message 是完備的,即它同時帶有消息內容和處理消息的 Handler 或 callback。下面列出它的主要成員變量:
public int what; // 消息 id public int arg1; // 消息參數 public int arg2; // 消息參數 public Object obj; // 消息參數 long when; // 處理延遲時間,由 Handler 的 sendMessageDelayed/postDelayed 設置 Handler target; // 處理消息的 Handler Runnable callback; // 處理消息的回調 Message next; // 鏈表結構,指向下一個消息
Message 有一些名為 obtain 的靜態方法用於創建 Message,通常我們都是通過 Handler 的 obtain 靜態方法轉調 Message 的靜態方法來創建新的 Message。
接下來分析 enqueueMessage:
final boolean enqueueMessage(Message msg, long when) { if (msg.isInUse()) { throw new AndroidRuntimeException(msg + " This message is already in use."); } if (msg.target == null) { throw new AndroidRuntimeException("Message must have a target."); } boolean needWake; synchronized (this) { if (mQuiting) { return false; } msg.when = when; Message p = mMessages; if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } } if (needWake) { nativeWake(mPtr); } return true; }
首先檢測消息的合法性:是否已經在處理中和是否有處理它的Handler,然后判斷 mQuiting 是否中止了,如果沒有則根據消息處理時間排序將消息插入鏈表中的合適位置。在這其中作了一些減少同步操作的優化,即使當前消息隊列已經處於 Blocked 狀態,且隊首是一個消息屏障(和內存屏障的理念一樣,這里是通過 p.target == null 來判斷隊首是否是消息屏障),並且要插入的消息是所有異步消息中最早要處理的才會 needwake 激活消息隊列去獲取下一個消息。Handler 的 post/sendMessage 系列方法最后都是通過轉調 MessageQueue 的 enqueueMessage 來實現的,比如:
public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); } private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); }
其實 Handler 中與Message 相關的靜態方法都是通過 MessageQueue 的對應的靜態方法實現的,比如 removeMessages, hasMessages, hasCallbacks 等等,這里就不一一詳述了。至此,已經完整地分析了如何通過 Handler 提交消息到 MessageQueue 中了。
下面來分析如何從 MessageQueue 中獲取合適的消息, 這是 next() 要做的最主要的事情,next() 方法還做了其他一些事情,這些其它事情是為了提高系統效果,利用消息隊列在空閑時通過 idle handler 做一些事情,比如 gc 等等。但它們和獲取消息關系不大,所以這部分將從略介紹。
final Message next() { int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } nativePollOnce(mPtr, nextPollTimeoutMillis); synchronized (this) { if (mQuiting) { return null; } // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (false) Log.v("MessageQueue", "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } // Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf("MessageQueue", "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } }
隊列被激活之后,首先判斷隊首是不是消息屏障,如果是則跳過所有的同步消息,查找最先要處理的異步消息。如果第一個待處理的消息還沒有到要處理的時機則設置激活等待時間;否則這個消息就是需要處理的消息,將該消息設置為 inuse,並將隊列設置為非 blocked 狀態,然后返回該消息。next() 方法是在 Looper.loop() 中被調用的,Looper 在獲得要處理的消息之后就會調用和消息關聯的 Handler 來分發消息,這里再回顧一下:
public static void loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } final MessageQueue queue = me.mQueue; ... for (;;) { Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } msg.target.dispatchMessage(msg); msg.recycle(); } }
如果隊列中沒有消息或者第一個待處理的消息時機未到,且也沒有其他利用隊列空閑要處理的事務,則將隊列設置為設置 blocked 狀態,進入等待狀態;否則就利用隊列空閑處理其它事務。
至此,已經對 Android 多線程相關的主要概念 Thread, HandlerThread, Handler, Looper, Message, MessageQueue 作了一番介紹,下一篇就要講講 AsyncTask,這是為了簡化 UI 多線程編程為提供的一個便利工具類。