Android 消息傳遞機制


線程間消息傳遞機制

http://androidxref.com/6.0.1_r10/xref/frameworks/base/core/java/android/os/Handler.java

http://androidxref.com/6.0.1_r10/xref/frameworks/base/core/java/android/os/MessageQueue.java

http://androidxref.com/6.0.1_r10/xref/frameworks/base/core/java/android/os/Looper.java

http://androidxref.com/6.0.1_r10/xref/frameworks/base/core/java/android/os/Message.java

 1.消息怎么發送的? 

我們都知道當調用Handler發送消息的時候,不管是調用

sendMessage,sendEmptyMessage,sendMessageDelayed還是其他發送一系列方法。最終都會調用sendMessageDelayed(Message msg, long delayMillis)方法。

  public final boolean sendMessageDelayed(Message msg, long delayMillis)
    {
        if (delayMillis < 0) {
            delayMillis = 0;
        }
        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
    }
該方法會調用sendMessageAtTime()方法。其中第二個參數是執行消息的時間,是通過 從開機到現在的毫秒數(手機睡眠的時間不包括在內)+ 延遲執行的時間。這里不使用System.currentTimeMillis() ,是因為該時間是可以修改的。會導致延遲消息等待時間不准確。該方法內部會調用sendMessageAtTime()方法,我們接着往下走。
   public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        if (queue == null) {
            RuntimeException e = new RuntimeException(
                    this + " sendMessageAtTime() called with no mQueue");
            Log.w("Looper", e.getMessage(), e);
            return false;
        }
        return enqueueMessage(queue, msg, uptimeMillis);
    }
 
    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        msg.target = this;
        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }

 

這里獲取到線程中的MessageQueue對象mQueue(在構造函數通過Looper獲得的),並調用enqueueMessage()方法,enqueueMessage()方法最終內部會調用MessageQueue的enqueueMessage()方法,那現在我們就直接看MessageQueue中把消息加入消息隊列中的方法。

消息的加入

當通過handler調用一系列方法如sendMessage()、sendMessageDelayed()方法時,最終會調用MessageQueue的enqueueMessage()方法。現在我們就來看看,消息是怎么加入MessageQueue(消息隊列)中去的。
boolean enqueueMessage(Message msg, long when) {
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }
        if (msg.isInUse()) {
            throw new IllegalStateException(msg + " This message is already in use.");
        }

        synchronized (this) {
            //如果當前消息循環已經結束,直接退出
            if (mQuitting) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }

            msg.markInUse();
            msg.when = when;
            Message p = mMessages;//頭部消息
            boolean needWake;
            //如果隊列中沒有消息,或者當前進入的消息比消息隊列中的消息等待時間短,那么就放在消息隊列的頭部
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                //判斷喚醒條件,當前當前消息隊列頭部消息是屏障消息,且當前插入的消息為異步消息
                //且當前消息隊列處於無消息可處理的狀態
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                //循環遍歷消息隊列,把當前進入的消息放入合適的位置(比較等待時間)
                for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } //將消息插入合適的位置
                msg.next = p; prev.next = msg;
            }

            //調用nativeWake,以觸發nativePollOnce函數結束等待
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }

這里大家肯定注意到了nativeWake()方法,這里先不對該方法進行詳細的描述,下文會對其解釋及介紹。 其實通過代碼大家就應該發現了,在將消息加入到消息隊列中時,已經將消息按照等待時間進行了排序。排序分為兩種情況(這里圖中的message.when是與當前的系統的時間差):

  • 第一種:如果隊列中沒有消息,或者當前進入的消息比消息隊列中頭部的消息等待時間短,那么就放在消息隊列的頭部

 

 第二種:反之,循環遍歷消息隊列,把當前進入的消息放入合適的位置(比較等待時間)

 

 綜上,我們了解了在我們使用Handler發送消息時,當消息進入到MessageQueue(消息隊列)中時,已經按照等待時間進行了排序,且其頭部對應的消息是Loop即將取出的消息

android_os_MessageQueue_nativeWake
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}

和之前一樣,也是通過long類型的ptr獲取NativeMessageQueue對象的指針,再調用wake方法

void NativeMessageQueue::wake() {
   mLooper->wake();
}

調用的也是Looper的方法:

void Looper::wake() {
    uint64_t inc = 1;
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            ALOGW("Could not write wake signal: %s", strerror(errno));
        }
    }
}

重點是write(mWakeEventFd, &inc, sizeof(uint64_t)),寫入了一個1,這個時候epoll就能監聽到事件,也就被喚醒了

Looper的wake方法其實是 使用write函數通過mWakeEventFd往管道寫入字符inc,其中TEMP_FAILURE_RETRY 是一個宏定義, 當執行write方法失敗后,會不斷重復執行,直到執行成功為止,在 nativeinit中,我們已經通過 epoll_create方法監聽了mWakeEventFd的可讀事件,當mWakeEventFd可讀時,epoll文件描述符就會監聽到,這時 epoll_wait方法就會從管道中讀取事件返回,返回后就執行消息處理邏輯,所以這里的往管道寫入字符inc,其實起到一個 通知的作用,告訴監聽的線程有消息插入了消息隊列了,快點 醒過來(因為進入了休眠狀態)處理一下。

 

2.怎么樣進行消息循環

我們都知道消息的取出,是通過 Loooper.loop()方法,其中loop()方法內部會調用MessageQueue中的next()方法。
  public static void loop() {
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        final MessageQueue queue = me.mQueue;
        //...
        for (;;) {
            Message msg = queue.next(); // might block
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }
      //...
            try {
                msg.target.dispatchMessage(msg);
                if (observer != null) {
                    observer.messageDispatched(token, msg);
                }
                dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
            } catch (Exception exception) {
              //...
            } finally {
              //...
            }
            //...

            msg.recycleUnchecked();
        }
    }        

從MessageQueue獲取下一條消息

   1.如果是null,退出

   2.如果不為null,dispatchMessage

怎么處理消費的分發過程?

public void dispatchMessage(Message msg){
          //檢查message的callback是否為null
          if(msg.calllback!=null){
              handleCallback(msg);           
           }
            else{
                  if(mCallback!=null){
                            if(mCallback.handleMessage(mssg)){//主意如果這個返回值是true 將不會執行下面的handlerMessage return;
                              } 
                   }
                    handleMessage(msg);
             }
}
Handlet處理消息的過程
首先,檢查Message的callback是否為null,不為空就通過handleCallback來處理消息。message的callback是一個Runnbale對象,實際上就是Handler的post方法所傳遞的Runnable參數。
private static void handleCallback(Message message){
        message.callback.run(); 
}

其次,檢查mCallback是否為空,不為null及調用mCallback的handleMessage方法來處理消息

public interface Callback{
      public boolean handleMessage(Message msg); 
}

 MessageQueue的next方法

Message next() {
        //...
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
            
            //執行native層消息機制層,
            //timeOutMillis參數為超時等待時間。如果為-1,則表示無限等待,直到有事件發生為止。
            //如果值為0,則無需等待立即返回。該方法可能會阻塞
 nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
                //獲取系統開機到現在的時間,如果使用System.currentMillis()會有誤差,
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;//頭部消息
                
                //判斷是否是柵欄,同時獲取消息隊列最近的異步消息
                if (msg != null && msg.target == null) {
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                  msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                //...
        }
    }
Message next() {
        
        //如果退出消息消息循環,那么就直接退出
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }

        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
            
            //執行native層消息機制層,
            //timeOutMillis參數為超時等待時間。如果為-1,則表示無限等待,直到有事件發生為止。
            //如果值為0,則無需等待立即返回。該方法可能會阻塞
            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
                //獲取系統開機到現在的時間,如果使用System.currentMillis()會有誤差,
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;//頭部消息
                
                //判斷是否是柵欄,同時獲取消息隊列最近的異步消息
                if (msg != null && msg.target == null) {
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                //獲取消息,判斷等待時間,如果還需要等待則等待相應時間后喚醒
                if (msg != null) {
                    if (now < msg.when) {//判斷當前消息時間,是不是比當前時間大,計算時間差
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // 不需要等待時間或者等待時間已經到了,那么直接返回該消息
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                    }
                } else {
                    //沒有更多的消息了
                    nextPollTimeoutMillis = -1;
                }

                // Process the quit message now that all pending messages have been handled.
                //判斷是否已經退出了
                if (mQuitting) {
                    dispose();
                    return null;
                }

                // If first time idle, then get the number of idlers to run.
                // Idle handles only run if the queue is empty or if the first message
                // in the queue (possibly a barrier) is due to be handled in the future.
                //獲取空閑時處理任務的handler 用於發現線程何時阻塞等待更多消息的回調接口。
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                //如果空閑時處理任務的handler個數為0,繼續讓線程阻塞
                if (pendingIdleHandlerCount <= 0) {
                    // No idle handlers to run.  Loop and wait some more.
                    mBlocked = true;
                    continue;
                }
                //判斷當前空閑時處理任務的handler是否是為空
                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }

            //只有第一次迭代的時候,才會執行下面代碼
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; // release the reference to the handler

                boolean keep = false;
                try {
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }
                //如果不保存空閑任務,執行完成后直接刪除
                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }

            // 重置空閑的handler個數,因為不需要重復執行
            pendingIdleHandlerCount = 0;
            
            //當執行完空閑的handler的時候,新的native消息可能會進入,所以喚醒Native消息機制層
            nextPollTimeoutMillis = 0;
        }
    }
View Code
MessageQueue的next()法肯定會很懵逼。媽的,這個nativePollOnce()方法是什么鬼,為毛它會阻塞呢?這個msg.isAsynchronous()判斷又是怎么回事?媽的這個邏輯有點亂理解不了啊。大家不要慌,讓我們帶着這幾個問題來慢慢分析

Native消息機制

其實在Android 消息處理機制中,不僅包括了Java層的消息機制處理,還包括了Native消息處理機制(與我們知道的Handler機制一樣,也擁有Handler、Looper、MessageQueue)。這里我們不講Native消息機制的具體代碼細節,如果有興趣的小伙伴,請查看----->深入理解Java Binder和MessageQueue

這里我們用一張圖來表示Native消息與Jave層消息的關系(這里為大家提供了Android源碼,大家可以按需下載),具體細節如下圖所示:




static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

 在nativePollOnce()方法中調用nativeMessageQueue的pollOnce()方法,我們接着走。

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}
這里我們發現NativeMessageQueue的pollOnce(timeoutMillis)內部
調用的是Native looper中的 pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData)方法。繼續看
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        // 先處理沒有Callback方法的 Response事件
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) { //ident大於0,則表示沒有callback, 因為POLL_CALLBACK = -2,
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }
        if (result != 0) {
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }
        // 再處理內部輪詢
        result = pollInner(timeoutMillis); 
    }
}
這里就簡單介紹一下pollOnce()方法。 該方法會一直等待Native消息,其中 timeOutMillis參數為超時等待時間。
如果為-1,則表示無限等待,直到有事件發生為止。
如果值為0,則無需等待立即返回。
那么既然nativePollOnce()方法有可能阻塞,那么根據上文我們討論的MessageQueue中的enqueueMessage中的nativeWake()方法。大家就應該了然了。nativeWake()方法就是喚醒Native消息機制不再等待消息而直接返回。

nativePollOnce()一直循環為毛不造成主線程的卡死?

到了這里,其實大家都會有個疑問,如果當前主線程的MessageQueue沒有消息時,程序就會便阻塞在loop的queue.next()中的nativePollOnce()方法里,一直循環那么主線程為什么不卡死呢?這里就涉及到Linux pipe/epoll機制,此時主線程會釋放CPU資源進入休眠狀態,直到下個消息到達或者有事務發生,通過往pipe管道寫端寫入數據來喚醒主線程工作。這里采用的epoll機制,是一種IO多路復用機制,可以同時監控多個描述符,當某個描述符就緒(讀或寫就緒),則立刻通知相應程序進行讀或寫操作,本質同步I/O,即讀寫是阻塞的。 所以說,主線程大多數時候都是處於休眠狀態,並不會消耗大量CPU資源。

 

                      


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM