Android Native層異步消息處理框架


 *本文系作者工作學習總結,尚有不完善及理解不恰當之處,歡迎批評指正*

一、前言

  在NuPlayer中,可以發現許多類似於下面的代碼:

 1 //=======================================//
 2 NuPlayerDriver::NuPlayerDriver(pid_t pid)
 3     : ......
 4      mLooper(new ALooper) 
 5       ......{
 6     mLooper->setName("NuPlayerDriver Looper");
 7     mLooper->start(
 8             false, /* runOnCallingThread */
 9             true,  /* canCallJava */
10             PRIORITY_AUDIO);
11 
12     mPlayer = new NuPlayer(pid);
13     mLooper->registerHandler(mPlayer) 
14 } 
15 
16 //=======================================//
17 sp<AMessage> msg = new AMessage(kWhatPerformSeek, this);
18 msg->setInt32("generation", ++mSeekGeneration);
19 msg->setInt64("timeUs", seekTimeUs);
20 status_t err = msg->postAndAwaitResponse(&response);
21 
22 //=======================================//
23 void NuPlayer::RTSPSource::onMessageReceived(const sp<AMessage> &msg) {
24     switch (what) {
25         case XXX:
26         case YYY:
27         ......
28         default:
29     }
30 }

   這就是android在native層實現的一個異步消息處理機制,在這個機制中所有的處理都是異步的。其基本的處理流程可概述如下:

  • 將變量封裝到一個消息AMessage結構體中,然后放到消息隊列中去,后台專門有一個線程會從這個隊列中取出消息並發送給指定的AHandler處理,在handler的onMessageReceived函數中根據AMessage的mWhat字段轉向對應的分支進行處理。

  在這里我們可能會產生這樣的疑問:在很多類中都會有各種消息post出來,而后台的異步消息處理線程又是怎么知道將一個消息發送給哪個類的onMessageReceived函數處理呢?

  要搞明白這個問題,就需要分析android在native層的異步消息處理框架。

二、基礎類分析

  在android native層的異步消息處理框架中涉及到的類主要有:ALooper、AHandler、AMessage、ALooperRoster、LooperThread等。

  在android開源代碼中,這些類的聲明頭文件位於:/frameworks/av/include/media/stagefright/foundation/ ,實現功能的源代碼位於:/frameworks/av/media/libstagefright/foundation/ ,下面就分別對這些類進行分析。

  1、AMessage -- 消息的載體

  結構體AMessage是消息的載體,在傳遞消息的過程中可以攜帶各種信息。

 1 //AMessage類簡析
 2 struct AMessage : public RefBase {
 3     //構造函數,在其中會對AMessage的兩個重要數據成員mWhat和mTarget進行初始化設置。
 4     AMessage();
 5     AMessage(uint32_t what, const sp<const AHandler> &handler);
 6     void setWhat(uint32_t what);//設置mWhat
 7     uint32_t what() const;//返回mWhat
 8     void setTarget(const sp<const AHandler> &handler);//設置mTarget
 9     //一系列的set和find函數,用於在傳遞消息的過程中攜帶各種信息
10     void setInt32(const char *name, int32_t value);
11     void setInt64(const char *name, int64_t value);
12     //....
13     bool findInt32(const char *name, int32_t *value) const;
14     bool findInt64(const char *name, int64_t *value) const;
15     //投遞消息到消息隊列中
16     status_t post(int64_t delayUs = 0);
17     // Posts the message to its target and waits for a response (or error) before returning.
18     status_t postAndAwaitResponse(sp<AMessage> *response);
19 protected:
20     virtual ~AMessage();//析構函數
21 private:
22     friend struct ALooper; // deliver()
23     //兩個重要的私有數據成員:
24     //mWhat指明這是一個什么消息,用於在onMessageReceived處理分支中進行匹配。
25     //mTarget用於后台線程在處理這個消息時判斷發送給哪一個類處理。
26     uint32_t mWhat;
27     ALooper::handler_id mTarget;
28 
29     wp<AHandler> mHandler;
30     wp<ALooper> mLooper;

  進一步分析AMessage對象的構造函數,首先我們查看AMessage的source code,如下:

 1 AMessage::AMessage(uint32_t what, const sp<const AHandler> &handler)
 2     : mWhat(what),
 3       mNumItems(0) {
 4     setTarget(handler);
 5 }
 6 
 7 void AMessage::setTarget(const sp<const AHandler> &handler) {
 8     if (handler == NULL) {
 9         mTarget = 0;
10         mHandler.clear();
11         mLooper.clear();
12     } else {
13         mTarget = handler->id();
14         mHandler = handler->getHandler();
15         mLooper = handler->getLooper();
16     }
17 }

  在構造函數中設置mWhat的值為指定的what值,用於指明這是一個什么消息,以便在onMessageReceived函數中找到匹配的分支進行處理。然后會調用setTarget函數,來設置mTarget為指定handler的mID值,mHandler為指定的handler,mLooper為與handler相關的ALooper,這樣后台處理線程就可以據此判斷將該消息發送給哪一個類(handler)進行處理了。

  構造消息並投遞出去的過程,如下示例:

1 void NuPlayer::start() {
2     (new AMessage(kWhatStart, this))->post();
3 }

   2、ALooper類

  這個類定義了消息循環和后台處理線程。

 1 //ALooper類簡析
 2 struct ALooper : public RefBase {
 3     //定義的兩個整型變量類型
 4     typedef int32_t event_id;//event id
 5     typedef int32_t handler_id;//handler id
 6     
 7     ALooper();//構造函數
 8     
 9     // Takes effect in a subsequent call to start().
10     void setName(const char *name); 
11 
12     handler_id registerHandler(const sp<AHandler> &handler);//注冊handler
13     void unregisterHandler(handler_id handlerID);//注銷handler
14     //啟動后台處理線程處理事件
15     status_t start(
16             bool runOnCallingThread = false,
17             bool canCallJava = false,
18             int32_t priority = PRIORITY_DEFAULT
19             );
20 protected:
21     virtual ~ALooper();    
22 private:
23     friend struct AMessage;       // post()
24     //事件的結構體封裝
25     struct Event {  
26         int64_t mWhenUs;
27         sp<AMessage> mMessage;
28     };
29     AString mName;
30     List<Event> mEventQueue;
31     struct LooperThread;
32     sp<LooperThread> mThread;//后台處理線程,LooperThread繼承自Thread
33     // posts a message on this looper with the given timeout
34     void post(const sp<AMessage> &msg, int64_t delayUs);
35     bool loop();
36 }

  3、LooperThread -- 后台處理線程

 1 struct ALooper::LooperThread : public Thread {
 2     LooperThread(ALooper *looper, bool canCallJava)
 3         : Thread(canCallJava),
 4           mLooper(looper),
 5           mThreadId(NULL) {
 6     }
 7 ....
 8     virtual bool threadLoop() {
 9         return mLooper->loop();
10     }
11 ....
12 protected:
13     virtual ~LooperThread() {}
14 
15 private:
16     ALooper *mLooper;
17     android_thread_id_t mThreadId;
18 };

 

  LooperThread類繼承自Thread,其函數threadLoop()用於在線程啟動后進行消息處理。在調用run()函數后便會執行threadLoop()函數中的程序,當其返回值為true時,該函數會被再次調用。

  4、AHandler -- 消息處理類的父類

  AHandler類用於對接收到的消息進行對應的處理。下面我們結合source code進行分析如下:

 1 struct AHandler : public RefBase {
 2     AHandler()    //構造函數
 3         : mID(0),    //初始化mID為0
 4           mVerboseStats(false),
 5           mMessageCounter(0) {
 6     }
 7 
 8     ALooper::handler_id id() const {//這個函數用於返回內部變量mID的值,其初始值為0,但是可以通過setID()設置
 9         return mID;
10     }
11 
12 
13 protected:
14     virtual void onMessageReceived(const sp<AMessage> &msg) = 0;
15 
16 private:
17     friend struct AMessage;      // deliverMessage()
18     friend struct ALooperRoster; // setID()
19 
20     ALooper::handler_id mID;
21     wp<ALooper> mLooper;
22     //setID()在其友元類ALooperRoster的registerHandler()函數中調用。
23     inline void setID(ALooper::handler_id id, wp<ALooper> looper) {
24         mID = id;
25         mLooper = looper;
26     }
27 
28 };

  5、ALooperRoster類

  ALooperRoster可以看做是ALooper的一個輔助類,主要用於完成消息handler的注冊與注銷工作。下面結合source code分析如下:

 1 //ALooperRoster簡析
 2 struct ALooperRoster {
 3     ALooperRoster();
 4 
 5     ALooper::handler_id registerHandler(//注冊handler
 6             const sp<ALooper> looper, const sp<AHandler> &handler);
 7 
 8     void unregisterHandler(ALooper::handler_id handlerID);//注銷handler
 9     void unregisterStaleHandlers();//注銷舊有的handler,在構造新的ALooper對象時會調用
10 private:
11     struct HandlerInfo {
12         wp<ALooper> mLooper;
13         wp<AHandler> mHandler;
14     };
15 
16     Mutex mLock;
17     KeyedVector<ALooper::handler_id, HandlerInfo> mHandlers;
18     ALooper::handler_id mNextHandlerID;
19 };

 

三、異步消息處理框架

  首先我們先copy一段代碼來看一看異步消息處理框架的搭建過程:

1 sp<ALooper> mLooper;
2 mLooper(new ALooper)//構造函數的初始化列表中
3 mLooper->setName("NuPlayerDriver Looper");
4 mLooper->start(false, true, PRIORITY_AUDIO);
5 mPlayer = new NuPlayer;// struct NuPlayer : public AHandler
6 mLooper->registerHandler(mPlayer);

  搭建框架1

  調用ALooper的構造函數,實例化一個新的ALooper對象:

1 ALooper::ALooper()
2     : mRunningLocally(false) {
3     // clean up stale AHandlers. Doing it here instead of in the destructor avoids
4     // the side effect of objects being deleted from the unregister function recursively.
5     gLooperRoster.unregisterStaleHandlers();
6 }

  搭建框架2

  調用ALooper::setName(const char *name)設置ALooper的名字:

1 void ALooper::setName(const char *name) {
2     mName = name;
3 }

  搭建框架3

  調用ALooper::start()啟動異步消息處理后台線程:

 1 status_t ALooper::start(
 2         bool runOnCallingThread, bool canCallJava, int32_t priority) {
 3     // ......
 4     
 5     if (mThread != NULL || mRunningLocally) {
 6         return INVALID_OPERATION;
 7     }
 8     mThread = new LooperThread(this, canCallJava);//構造LooperThread對象
 9     status_t err = mThread->run( //run()之后就會執行LooperThread::threadLooper()函數,在threadlooper()函數中又會調用ALooper::loop()函數
10             mName.empty() ? "ALooper" : mName.c_str(), priority);
11     if (err != OK) {
12         mThread.clear();
13     }
14     return err;
15 }

  搭建框架4

  調用ALooper::registerHandler()注冊消息處理器類AHandler:

1 ALooper::handler_id ALooper::registerHandler(const sp<AHandler> &handler) {
2     return gLooperRoster.registerHandler(this, handler);
3 }

  從這個函數的定義可以看出,能作為handler進行注冊的類都必須繼承自AHandler這個類,注冊的過程也是交給了LooperRoster::registerHandler()函數來處理的。接下來我們分析LooperRoster::registerHandler()處理的具體過程。

 1 //注冊handler的過程可以理解為建立ALooper對象與AHandler對象間的聯系
 2 ALooper::handler_id ALooperRoster::registerHandler(
 3         const sp<ALooper> looper, const sp<AHandler> &handler) {
 4     Mutex::Autolock autoLock(mLock);
 5     //由前面對AHandler的介紹可知,在AHandler構造函數中,其mID初始值為0.
 6     //調用AHandler::id()可獲取mID值
 7     //調用AHandler::setID()可設置mID值
 8     if (handler->id() != 0) {
 9         //任何一個AHandler對象只能注冊一次,未注冊的mID==0
10         CHECK(!"A handler must only be registered once.");
11         return INVALID_OPERATION;
12     }
13     //結構體ALooperRoster::HandlerInfo的定義為:
14     //struct HandlerInfo {
15     //    wp<ALooper> mLooper;
16     //    wp<AHandler> mHandler;
17     //};
18     HandlerInfo info;
19     info.mLooper = looper;
20     info.mHandler = handler;
21     //mNextHandlerID是ALooperRoster的私有數據成員,其初始值為1,每注冊一個handler其值加1
22     //gALooperRoster是一個ALooperRoster類型的全局對象,利用ALooperRoster的registerHandler()
23     //函數注冊handler過程中mNextHandlerID從1開始遞增,這樣保證每一個AHandler對象均有一個獨一無二的handler_id
24     //最后在發送Message進行處理時就是根據這個獨一無二的handler_id找到對應得handler進行處理的。
25     ALooper::handler_id handlerID = mNextHandlerID++;
26     //在gALooperRoster中有定義KeyedVector<ALooper::handler_id, HandlerInfo> mHandlers;
27     //mHandlers是一個存儲handler_id與HandlerInfo對的容器
28     //注:一個looper可以有多個handler,但一個handler只能有一個對應的looper
29     mHandlers.add(handlerID, info);
30     //設置handler的mID
31     handler->setID(handlerID, looper);
32 
33     return handlerID;
34 }

  注:LooperRoster對象gLooperRoster並不是某一個ALooper對象的私有數據成員,而是一個全局的對象,這樣就保證了在整個程序運行中利用gLooperRoster::registerHandler()進行handler注冊時,每一個AHandler對象均會有一個獨一無二的handler_id.

四、消息處理

  消息處理首先要產生消息並將其投遞出去,然后等待處理。下面是一段產生消息並投遞的實例:

1     sp<AMessage> msg = new AMessage(kWhatPerformSeek, mReflector->id());
2     msg->setInt32("generation", ++mSeekGeneration);
3     msg->setInt64("timeUs", seekTimeUs);
4     msg->post(200000ll);

 

   可以看到通過調用AMessage::post()函數將消息投遞出去,在分析post()函數前,我們先了解一下AMessage對象如何建立與ALooper、AHandler的對象的聯系?

  首先我們看一下AMessage的構造函數:

1 AMessage::AMessage(uint32_t what, const sp<const AHandler> &handler)
2   : mWhat(what),
3   mNumItems(0) {
4     setTarget(handler);
5 }

  在構造函數中傳遞進來what(這是個什么消息)和handler(由誰處理)兩個參數,首先初始化mWhat字段,然后調用setTarget函數:

 1 void AMessage::setTarget(const sp<const AHandler> &handler) {
 2     if (handler == NULL) {
 3         mTarget = 0;
 4         mHandler.clear();
 5         mLooper.clear();
 6     } else {
 7         mTarget = handler->id();
 8         mHandler = handler->getHandler();
 9         mLooper = handler->getLooper();
10     }
11 }

  在setTarget函數中將mTarget初始化為handler_id,mTarget用於后台線程在處理這個消息時判斷發送給哪一個類處理。同時獲取與handler相關聯的ALooper對象--mLooper = handler->getLooper()。這樣就建立了AMessage對象與ALooper、AHandler的對象的聯系。

  接下來我們分析AMessage::post()函數,source code如下:

 1 status_t AMessage::post(int64_t delayUs) {
 2     sp<ALooper> looper = mLooper.promote();
 3     if (looper == NULL) {
 4         ALOGW("failed to post message as target looper for handler %d is gone.", mTarget);
 5         return -ENOENT;
 6     }
 7 
 8     looper->post(this, delayUs);
 9     return OK;
10 }

可以看到,post之后調用了ALooper::post()來處理,下面是器source code:

 1 void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {
 2     Mutex::Autolock autoLock(mLock);
 3 
 4     int64_t whenUs;
 5     if (delayUs > 0) {
 6         whenUs = GetNowUs() + delayUs;
 7     } else {
 8         whenUs = GetNowUs();
 9     }
10 
11     List<Event>::iterator it = mEventQueue.begin();
12     while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {
13         ++it;
14     }
15 //上面這一段代碼,主要是遍歷消息隊列中的消息的時間,然后和我們的消息做對比,最終目的就是所有的消息必須按照時間先后順序放在隊列中等待執行。
16 //構造一個Event消息,將我們的msg設置進去
17     Event event;
18     event.mWhenUs = whenUs;
19     event.mMessage = msg;
20 
21     if (it == mEventQueue.begin()) {
22         mQueueChangedCondition.signal();
23     }
24 //將我們的消息放入到消息隊列中,等待執行
25     mEventQueue.insert(it, event);
26 }

五、后台線程消息處理

  前面有提到創建ALooper對象后調用ALooper::start()函數啟動了一個后台消息處理線程,接下來我們進行詳細分析:

 1 status_t ALooper::start(
 2         bool runOnCallingThread, bool canCallJava, int32_t priority) {
 3     //......
 4     if (mThread != NULL || mRunningLocally) {
 5         return INVALID_OPERATION;
 6     }
 7     mThread = new LooperThread(this, canCallJava);//創建LooperThread對象
 8     //**********************************************************************
 9     status_t err = mThread->run(       //
10             mName.empty() ? "ALooper" : mName.c_str(), priority);
11     //**********************************************************************        
12     if (err != OK) {
13         mThread.clear();
14     }
15     return err;
16 }

  調用LooperThread::run()函數啟動后台處理線程,就會轉去執行LooperThread::threadLoop()函數:

virtual bool threadLoop() {
    return mLooper->loop();
}

 

  在threadLoop函數中又會調用創建LooperThread對象時傳進來的ALooper的loop()函數:

 1 bool ALooper::loop() {
 2     Event event;
 3     
 4     {
 5         Mutex::Autolock autoLock(mLock);
 6         ....
 7         //取出Event隊列的隊頭消息,取出后然后把它從隊列中刪掉
 8         event = *mEventQueue.begin();
 9         mEventQueue.erase(mEventQueue.begin());
10     }
11     //調用AMessage的deliver函數發送出去執行
12     event.mMessage->deliver();
13 
14     return true;
15 }

  注:如果ALooper::loop()返回true,則threadLoop()函數會被再次調用,后台線程繼續執行,如果返回false則后台線程停止運行。

 在ALooper::loop()函數中取出隊頭的消息並調用AMessage::deliver()函數發送出去:

1 void AMessage::deliver() {
2     sp<AHandler> handler = mHandler.promote();
3     if (handler == NULL) {
4         ALOGW("failed to deliver message as target handler %d is gone.", mTarget);
5         return;
6     }
7     handler->deliverMessage(this);
8 }

 

  AMessage::deliver()函數中,

  首先得到處理message的handler -- sp<AHandler> handler = mHandler.promote();

  然后會調用AHandler::deliverMessage()函數 -- handler->deliverMessage(this);

  轉到處理該message的handler進行處理:

 1 void AHandler::deliverMessage(const sp<AMessage> &msg) {
 2     onMessageReceived(msg);
 3     mMessageCounter++;
 4 
 5     if (mVerboseStats) {
 6         uint32_t what = msg->what();
 7         ssize_t idx = mMessages.indexOfKey(what);
 8         if (idx < 0) {
 9             mMessages.add(what, 1);
10         } else {
11             mMessages.editValueAt(idx)++;
12         }
13     }
14 }

 

  在AHandler::deliverMessage()函數中調用了AHandler::onMessageReceived(msg)函數,上面有提到該函數中多多個case分支,根據message的what字段轉到對應的分支進行處理,比如在NuPlayer中:

 1 void NuPlayer::onMessageReceived(const sp<AMessage> &msg) {
 2     switch (msg->what()) {
 3         case kWhatSetDataSource:
 4         case kWhatVideoNotify:
 5         case kWhatAudioNotify:
 6         case kWhatRendererNotify:
 7         case kWhatMoreDataQueued:
 8         case kWhatReset:
 9         case kWhatSeek:
10         case kWhatPause:
11         case kWhatResume:
12         default:
13     }
14 }

  至此消息處理的整個流程結束。

六、總結

  1. 首先需要構造一個AMessage對象,必須攜帶兩個參數:what(什么消息)和handler(誰處理);
  2. ALooper中有一個后台線程--LooperThread,該線程維護着一個消息隊列List<Event> mEventQueue,線程函數不斷從這個隊列中取出消息執行;
  3. 消息的發送和取出需要調用ALooper,AMessage,AHandler等的函數。

 

 

微信掃一掃,關注玖零日記,獲取更多相關資訊源碼 -- 雖無面朝大海,依舊春暖花開

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM