在flink的數據傳輸過程中,有兩類數據,一類數據是控制流數據,比如提交作業,比如連接jm,另一類數據是業務數據。flink對此采用了不同的傳輸機制,控制流數據的傳輸采用akka進行,業務類數據傳輸在自己實現了序列化框架的前提下使用netty進行。之所以采用akka進行控制流數據的傳送,是因為akka支持異步調用,並且支持良好的並發模型。所以,了解一下akka進行消息傳送的知識,也有助於理解flink的作業運行邏輯。
這張圖反映了一個典型的消息發送過程,所有的這些對象,actor,mailbox,dispathcer等等,都存在於一個叫actorSystem的對象中。而actorSystem同時也持有一個根actor,它是所有用戶創建actor的父類,如下圖。
ActorSystem是進入到Actor的世界的一扇大門。通過它你可以創建或中止Actor。甚至還可以把整個Actor環境給關閉掉。另一方面來說,Actor是一個分層的結構,ActorSystem之於Actor有點類似於java.lang.Object或者scala.Any的角色——也就是說,它是所有Actor的根對象。當你通過ActorSystem的actorOf方法創建了一個Actor時,你其實創建的是ActorSystem下面的一個Actor。
對於一個actorSystem而言,主要的成員變量包含以下幾個:
provider:ActorRefProvider,實際創建actor的工廠
guardian:InternalActorRef,用戶創建actor的監管者
systemGuardian:InternalActorRef,系統創建actor的監管者
threadFactor:ThreadFactory,事件運行線程池模型
mailboxes:Mailboxes,存放事件的郵箱
dispatcher:ExecutionContextExecutor,負責事件分發的分發器
deadLetters:ActorRef,一個接受deadLetter的actor
而上面需要解釋的一個概念是郵箱:MailBox
默認的郵箱是UnboundedMailbox,底層其實是一個java.util.concurrent.ConcurrentLinkedQueue,它非阻塞並且無界。初次之外,akka提供了很多別的郵箱,包括SingleConsumerOnlyUnboundedMailbox、NonBlockingBoundedMailbox、UnboundedControlAwareMailbox、UnboundedPriorityMailbox、UnboundedStablePriorityMailbox等等,可以根據不同的使用場景進行配置。
另一個比較重要的概念是分發器,默認的分發器就是Dispatcher,這個模型中,每個actor都有自己的郵箱,但是他們共享一個dispatcher,這個dispatcher可以運行在不同的線程池模型上,默認的線程池模型是fork-join-executor,這個分發器是專門為非阻塞模型優化。
還有Pinned dispatcher,這個模型中每個actor有一個自己的郵箱,同時有自己的只有一個線程的線程池,不同actor之間的線程不會共享,並且底層只支持thread-pool-executor。這個模型適合於處理阻塞任務,因為他們跑在不同的線程中,比如耗時的IO操作。
除此之外還有balancing dispatcher,這個模式將嘗試從繁忙的actor重新分配工作到空閑的actor。 所有actor共享單個郵箱,並從中獲取他們的消息。 這里假定所有使用此調度器的actor都可以處理發送到其中一個actor的所有的消息;即actor屬於actor池,並且對客戶端來說沒有保證來決定哪個actor實例實際上處理某個特定的消息。 可共享性:僅對同一類型的Actor共享 郵箱:任意,為所有的Actor創建一個 使用場景:Work-sharing 底層驅動:java.util.concurrent.ExecutorService 通過”executor”指定,可使用 “fork-join-executor”, “thread-pool-executor” 或akka.dispatcher.ExecutorServiceConfigurator的限定 請注意不能將BalancingDispatcher用作一個路由器調度程序。
OK,在了解了基礎知識之后,我們來串一下發消息的流程:
1 ActorRef 2 def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = underlying.sendMessage(message, sender 3 Dispatch 4 def sendMessage(msg: Envelope): Unit = 5 try { 6 if (system.settings.SerializeAllMessages) { 7 val unwrapped = (msg.message match { 8 case DeadLetter(wrapped, _, _) ⇒ wrapped 9 case other ⇒ other 10 }).asInstanceOf[AnyRef] 11 if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) { 12 val s = SerializationExtension(system) 13 s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get 14 } 15 } 16 dispatcher.dispatch(this, msg) 17 } catch handleException
當我們通過!來發送消息,最后會調用到16行的dispatcher.dispatch方法。
1 protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = { 2 protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = { 3 val mbox = receiver.mailbox 4 mbox.enqueue(receiver.self, invocation) 5 registerForExecution(mbox, true, false) 6 } 7 8 protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = { 9 if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races 10 if (mbox.setAsScheduled()) { 11 try { 12 executorService execute mbox 13 true 14 } catch { 15 case e: RejectedExecutionException ⇒ 16 try { 17 executorService execute mbox 18 true 19 } catch { //Retry once 20 case e: RejectedExecutionException ⇒ 21 mbox.setAsIdle() 22 eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!")) 23 throw e 24 } 25 } 26 } else false 27 } else false 28 }
這其中的關鍵在於12行,使用底層的線程池模型來執行這個mbox,當然,mbox能執行的前提是他本身是一個runnable對象,提交即意味着執行其中的run方法。
1 MailBox 2 override final def run(): Unit = { 3 try { 4 if (!isClosed) { //Volatile read, needed here 5 processAllSystemMessages() //First, deal with any system messages 6 processMailbox() //Then deal with messages 7 } 8 } finally { 9 setAsIdle() //Volatile write, needed here 10 dispatcher.registerForExecution(this, false, false) 11 } 12 }
其中processAllSystemMessage方法處理類似watch之類的系統消息,processMailBox處理用戶消息。
1 MailBox 2 @tailrec private final def processMailbox( 3 left: Int = java.lang.Math.max(dispatcher.throughput, 1), 4 deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit = 5 if (shouldProcessMessage) { 6 val next = dequeue() 7 if (next ne null) { 8 if (Mailbox.debug) println(actor.self + " processing message " + next) 9 actor invoke next 10 if (Thread.interrupted()) 11 throw new InterruptedException("Interrupted while processing actor messages") 12 processAllSystemMessages() 13 if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0)) 14 processMailbox(left - 1, deadlineNs) 15 } 16 }
processMailBox的關鍵在於第9行的代碼,真正調用這個actor本身來執行next這個消息。這里的dispatcher.throughput限制了每次執行的消息條數。
1 Actor 2 final def invoke(messageHandle: Envelope): Unit = try { 3 currentMessage = messageHandle 4 cancelReceiveTimeout() // FIXME: leave this here??? 5 messageHandle.message match { 6 case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) 7 case msg ⇒ receiveMessage(msg) 8 } 9 currentMessage = null // reset current message after successful invocation 10 } catch handleNonFatalOrInterruptedException { e ⇒ 11 handleInvokeFailure(Nil, e) 12 } finally { 13 checkReceiveTimeout // Reschedule receive timeout 14 }
invoke方法中,緊接着調用了receiveMessage方法。
1 Actor 2 final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg) 3 4 protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)
這里終於看到了我們在實現一個actor的時候必然要實現的receiver方法,它在第4行最終被調用。
那么同時能夠存在多少個actor執行任務了?那就要看fork-join-pool中提供的線程的個數,以及提交的actor在執行任務的時候需要的線程個數了。雖然每一個actor在執行的時候可以觸發的消息個數是有最大值的,但是同時在執行的actor的個數應該是動態的。如果某一個actor使用了線程池中所有的線程,那可能其他actor就沒法同時執行,如果大多數actor都只使用一個線程觸發消息,則可以同時有多個actor在線程池中運行。但如果相互之間有發送消息,則只有等待,不過,akka本身就是異步的,對於大多數消息而言,發送消息之后就不管了,只等着對方處理完畢之后再發送消息給自己來實現回調。
在flink中提供了大量的默認的akka的配置,比較重要的幾個如下:
1 akka.ask.timeout:10s 阻塞操作,可能因為機器繁忙或者網絡堵塞導致timeout,可以嘗試設置大一點。 2 akka.client.timeout:60s 在client端的全部阻塞操作的時長 3 akka.fork-join-executor.parallelism-factor:2.0 ceil(available processors*factor) bounded by the min and max 4 akka.fork-join-executor.parallelism-max:64 5 akka.fork-join-executor.parallelism-min:8 6 akka.framesize:10485760b,1.25MB JM和TM之間傳輸的最大的消息值 7 akka.lookup.timeout:10s 找JM的時間 8 akka.retry-gate-closed-for:50 如果遠端的鏈接斷開,多少毫秒之內,gate應該關閉 9 akka.throughput:15 每個調度周期能夠處理的消息的最大值,小的值意味着公平,大的值意味着效率
參考了如下的地址,感謝。
https://blog.csdn.net/pzw_0612/article/details/47385177
https://www.cnblogs.com/devos/p/4438402.html
https://blog.csdn.net/birdben/article/details/49796923