簡述在akka中發送消息的過程


 

 

    在flink的數據傳輸過程中,有兩類數據,一類數據是控制流數據,比如提交作業,比如連接jm,另一類數據是業務數據。flink對此采用了不同的傳輸機制,控制流數據的傳輸采用akka進行,業務類數據傳輸在自己實現了序列化框架的前提下使用netty進行。之所以采用akka進行控制流數據的傳送,是因為akka支持異步調用,並且支持良好的並發模型。所以,了解一下akka進行消息傳送的知識,也有助於理解flink的作業運行邏輯。

這張圖反映了一個典型的消息發送過程,所有的這些對象,actor,mailbox,dispathcer等等,都存在於一個叫actorSystem的對象中。而actorSystem同時也持有一個根actor,它是所有用戶創建actor的父類,如下圖。

ActorSystem是進入到Actor的世界的一扇大門。通過它你可以創建或中止Actor。甚至還可以把整個Actor環境給關閉掉。另一方面來說,Actor是一個分層的結構,ActorSystem之於Actor有點類似於java.lang.Object或者scala.Any的角色——也就是說,它是所有Actor的根對象。當你通過ActorSystem的actorOf方法創建了一個Actor時,你其實創建的是ActorSystem下面的一個Actor。

對於一個actorSystem而言,主要的成員變量包含以下幾個:

provider:ActorRefProvider,實際創建actor的工廠
guardian:InternalActorRef,用戶創建actor的監管者
systemGuardian:InternalActorRef,系統創建actor的監管者
threadFactor:ThreadFactory,事件運行線程池模型
mailboxes:Mailboxes,存放事件的郵箱
dispatcher:ExecutionContextExecutor,負責事件分發的分發器
deadLetters:ActorRef,一個接受deadLetter的actor

而上面需要解釋的一個概念是郵箱:MailBox

默認的郵箱是UnboundedMailbox,底層其實是一個java.util.concurrent.ConcurrentLinkedQueue,它非阻塞並且無界。初次之外,akka提供了很多別的郵箱,包括SingleConsumerOnlyUnboundedMailbox、NonBlockingBoundedMailbox、UnboundedControlAwareMailbox、UnboundedPriorityMailbox、UnboundedStablePriorityMailbox等等,可以根據不同的使用場景進行配置。

另一個比較重要的概念是分發器,默認的分發器就是Dispatcher,這個模型中,每個actor都有自己的郵箱,但是他們共享一個dispatcher,這個dispatcher可以運行在不同的線程池模型上,默認的線程池模型是fork-join-executor,這個分發器是專門為非阻塞模型優化。

 

還有Pinned dispatcher,這個模型中每個actor有一個自己的郵箱,同時有自己的只有一個線程的線程池,不同actor之間的線程不會共享,並且底層只支持thread-pool-executor。這個模型適合於處理阻塞任務,因為他們跑在不同的線程中,比如耗時的IO操作。

 

 除此之外還有balancing dispatcher,這個模式將嘗試從繁忙的actor重新分配工作到空閑的actor。 所有actor共享單個郵箱,並從中獲取他們的消息。 這里假定所有使用此調度器的actor都可以處理發送到其中一個actor的所有的消息;即actor屬於actor池,並且對客戶端來說沒有保證來決定哪個actor實例實際上處理某個特定的消息。 可共享性:僅對同一類型的Actor共享 郵箱:任意,為所有的Actor創建一個 使用場景:Work-sharing 底層驅動:java.util.concurrent.ExecutorService 通過”executor”指定,可使用 “fork-join-executor”, “thread-pool-executor” 或akka.dispatcher.ExecutorServiceConfigurator的限定 請注意不能將BalancingDispatcher用作一個路由器調度程序。

 

 

 

 

OK,在了解了基礎知識之后,我們來串一下發消息的流程:

 1 ActorRef  
 2 def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = underlying.sendMessage(message, sender
 3 Dispatch
 4 def sendMessage(msg: Envelope): Unit =
 5     try {
 6       if (system.settings.SerializeAllMessages) {
 7         val unwrapped = (msg.message match {
 8           case DeadLetter(wrapped, _, _) ⇒ wrapped
 9           case other                     ⇒ other
10         }).asInstanceOf[AnyRef]
11         if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) {
12           val s = SerializationExtension(system)
13           s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get
14         }
15       }
16       dispatcher.dispatch(this, msg)
17     } catch handleException

當我們通過!來發送消息,最后會調用到16行的dispatcher.dispatch方法。

 1  protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
 2  protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
 3     val mbox = receiver.mailbox
 4     mbox.enqueue(receiver.self, invocation)
 5     registerForExecution(mbox, true, false)
 6   }
 7 
 8 protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
 9     if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
10       if (mbox.setAsScheduled()) {
11         try {
12           executorService execute mbox
13           true
14         } catch {
15           case e: RejectedExecutionException ⇒
16             try {
17               executorService execute mbox
18               true
19             } catch { //Retry once
20               case e: RejectedExecutionException ⇒
21                 mbox.setAsIdle()
22                 eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
23                 throw e
24             }
25         }
26       } else false
27     } else false
28   }

這其中的關鍵在於12行,使用底層的線程池模型來執行這個mbox,當然,mbox能執行的前提是他本身是一個runnable對象,提交即意味着執行其中的run方法。

 1 MailBox
 2 override final def run(): Unit = {
 3     try {
 4       if (!isClosed) { //Volatile read, needed here
 5         processAllSystemMessages() //First, deal with any system messages
 6         processMailbox() //Then deal with messages
 7       }
 8     } finally {
 9       setAsIdle() //Volatile write, needed here
10       dispatcher.registerForExecution(this, false, false)
11     }
12   }

其中processAllSystemMessage方法處理類似watch之類的系統消息,processMailBox處理用戶消息。

 1 MailBox
 2 @tailrec private final def processMailbox(
 3     left: Int = java.lang.Math.max(dispatcher.throughput, 1),
 4     deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
 5     if (shouldProcessMessage) {
 6       val next = dequeue()
 7       if (next ne null) {
 8         if (Mailbox.debug) println(actor.self + " processing message " + next)
 9         actor invoke next
10         if (Thread.interrupted())
11           throw new InterruptedException("Interrupted while processing actor messages")
12         processAllSystemMessages()
13         if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
14           processMailbox(left - 1, deadlineNs)
15       }
16     }

processMailBox的關鍵在於第9行的代碼,真正調用這個actor本身來執行next這個消息。這里的dispatcher.throughput限制了每次執行的消息條數。

 1 Actor
 2  final def invoke(messageHandle: Envelope): Unit = try {
 3     currentMessage = messageHandle
 4     cancelReceiveTimeout() // FIXME: leave this here???
 5     messageHandle.message match {
 6       case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle)
 7       case msg                      ⇒ receiveMessage(msg)
 8     }
 9     currentMessage = null // reset current message after successful invocation
10   } catch handleNonFatalOrInterruptedException { e ⇒
11     handleInvokeFailure(Nil, e)
12   } finally {
13     checkReceiveTimeout // Reschedule receive timeout
14   }

invoke方法中,緊接着調用了receiveMessage方法。

1 Actor
2 final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)
3 
4 protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)

這里終於看到了我們在實現一個actor的時候必然要實現的receiver方法,它在第4行最終被調用。

那么同時能夠存在多少個actor執行任務了?那就要看fork-join-pool中提供的線程的個數,以及提交的actor在執行任務的時候需要的線程個數了。雖然每一個actor在執行的時候可以觸發的消息個數是有最大值的,但是同時在執行的actor的個數應該是動態的。如果某一個actor使用了線程池中所有的線程,那可能其他actor就沒法同時執行,如果大多數actor都只使用一個線程觸發消息,則可以同時有多個actor在線程池中運行。但如果相互之間有發送消息,則只有等待,不過,akka本身就是異步的,對於大多數消息而言,發送消息之后就不管了,只等着對方處理完畢之后再發送消息給自己來實現回調。

 

在flink中提供了大量的默認的akka的配置,比較重要的幾個如下:

1 akka.ask.timeout:10s 阻塞操作,可能因為機器繁忙或者網絡堵塞導致timeout,可以嘗試設置大一點。
2 akka.client.timeout:60s 在client端的全部阻塞操作的時長
3 akka.fork-join-executor.parallelism-factor:2.0 ceil(available processors*factor) bounded by the min and max
4 akka.fork-join-executor.parallelism-max:64
5 akka.fork-join-executor.parallelism-min:8
6 akka.framesize:10485760b,1.25MB JM和TM之間傳輸的最大的消息值
7 akka.lookup.timeout:10s 找JM的時間
8 akka.retry-gate-closed-for:50 如果遠端的鏈接斷開,多少毫秒之內,gate應該關閉
9 akka.throughput:15 每個調度周期能夠處理的消息的最大值,小的值意味着公平,大的值意味着效率

 

參考了如下的地址,感謝。

https://blog.csdn.net/pzw_0612/article/details/47385177

https://www.cnblogs.com/devos/p/4438402.html

https://blog.csdn.net/birdben/article/details/49796923

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM