spark streaming 接收kafka消息之四 -- 運行在 worker 上的 receiver


使用分布式receiver來獲取數據
使用 WAL 來實現 At least once 操作:
conf.set("spark.streaming.receiver.writeAheadLog.enable","true") // 開啟 WAL
// 1、At most once - 每條數據最多被處理一次(0次或1次),這種語義下會出現數據丟失的問題;
// 2、At least once - 每條數據最少被處理一次 (1次或更多),這個不會出現數據丟失,但是會出現數據重復;
// 3、Exactly once - 每條數據只會被處理一次,沒有數據會丟失,並且沒有數據會被多次處理,這種語義是大家最想要的,但是也是最難實現的。

如果不做容錯,將會帶來數據丟失,因為Receiver一直在接收數據,在其沒有處理的時候(已通知zk數據接收到),Executor突然掛掉(或是driver掛掉通知executor關閉),緩存在內存中的數據就會丟失。因為這個問題,Spark1.2開始加入了WAL(Write ahead log)開啟 WAL,將receiver獲取數據的存儲級別修改為StorageLevel. MEMORY_AND_DISK_SER_2

1 // 缺點,不能自己維護消費 topic partition 的 offset
2 // 優點,開啟 WAL,來確保 exactly-once 語義
3 val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
4     ssc,kafkaParams,map,StorageLevel.MEMORY_AND_DISK_SER_2)

從Kafka 中讀取數據

Driver 規划 receiver 運行的信息

org.apache.spark.streaming.StreamingContext#start中啟動了 JobScheduler實例

 1 // private[streaming] val scheduler = new JobScheduler(this)
 2 
 3 // Start the streaming scheduler in a new thread, so that thread local properties
 4 // like call sites and job groups can be reset without affecting those of the
 5 // current thread.
 6 ThreadUtils.runInNewThread("streaming-start") { // 單獨的一個daemon線程運行函數題
 7   sparkContext.setCallSite(startSite.get)
 8   sparkContext.clearJobGroup()
 9   sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
10 // 執行start 方法
11   scheduler.start()
12 }
13 state = StreamingContextState.ACTIVE

 

org.apache.spark.streaming.scheduler.JobScheduler#start 源碼如下:

 1 def start(): Unit = synchronized {
 2   if (eventLoop != null) return // scheduler has already been started
 3 
 4   logDebug("Starting JobScheduler")
 5   eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
 6     override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
 7 
 8     override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
 9   }
10   eventLoop.start()
11 
12   // attach rate controllers of input streams to receive batch completion updates
13   for {
14     inputDStream <- ssc.graph.getInputStreams
15     rateController <- inputDStream.rateController
16   } ssc.addStreamingListener(rateController)
17 
18   listenerBus.start(ssc.sparkContext)
19   receiverTracker = new ReceiverTracker(ssc)
20   inputInfoTracker = new InputInfoTracker(ssc)
21   receiverTracker.start()
22   jobGenerator.start()
23   logInfo("Started JobScheduler")
24 }

 

 

ReceiverTracker 的類聲明如下:

1 This class manages the execution of the receivers of ReceiverInputDStreams. Instance of this class must be created after all input streams have been added and StreamingContext.start() has been called because it needs the final set of input streams at the time of instantiation.
2 此類負責執行ReceiverInputDStreams的receiver。必須在添加所有輸入流並調用StreamingContext.start()之后創建此類的實例,因為它在實例化時需要最終的輸入流集。

 

其 start 方法如下:

 1 /** Start the endpoint and receiver execution thread. */
 2 def start(): Unit = synchronized {
 3   if (isTrackerStarted) {
 4     throw new SparkException("ReceiverTracker already started")
 5   }
 6 
 7   if (!receiverInputStreams.isEmpty) {
 8 // 建立rpc endpoint
 9     endpoint = ssc.env.rpcEnv.setupEndpoint( // 注意,這是一個driver的 endpoint
10       "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
11 // driver節點上發送啟動 receiver 命令
12     if (!skipReceiverLaunch) launchReceivers()
13     logInfo("ReceiverTracker started")
14     trackerState = Started
15   }
16 }
17 
18 /**
19  * Get the receivers from the ReceiverInputDStreams, distributes them to the
20  * worker nodes as a parallel collection, and runs them.
21  */
22 // 從ReceiverInputDStreams 獲取到 receivers,然后將它們分配到不同的 worker 節點並運行它們。
23 private def launchReceivers(): Unit = {
24   val receivers = receiverInputStreams.map(nis => {
25 // 未啟用WAL 是KafkaReceiver,啟動WAL后是ReliableKafkaReceiver
26     val rcvr = nis.getReceiver()
27     rcvr.setReceiverId(nis.id)
28     rcvr
29   })
30   // 運行一個簡單的應用來確保所有的salve node都已經啟動起來,避免所有的 receiver 任務都在同一個local node上
31   runDummySparkJob()
32 
33   logInfo("Starting " + receivers.length + " receivers")
34   endpoint.send(StartAllReceivers(receivers)) // 發送請求driver 轉發 啟動 receiver 的命令
35 }

Driver 端StartAllReceivers 的處理代碼如下:

 1 override def receive: PartialFunction[Any, Unit] = {
 2   // Local messages
 3   case StartAllReceivers(receivers) =>
 4 // schduleReceiver
 5     val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
 6     for (receiver <- receivers) {
 7       val executors = scheduledLocations(receiver.streamId)
 8       updateReceiverScheduledExecutors(receiver.streamId, executors)
 9       receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
10       startReceiver(receiver, executors)
11     }
12 ……
13 }

 

getExecutors源碼如下:

 1 /**
 2  * Get the list of executors excluding driver
 3  */
 4 // 如果是 local 模式,返回 本地線程; 如果是 yarn 模式,返回 非driver 節點上的 excutors
 5 private def getExecutors: Seq[ExecutorCacheTaskLocation] = {
 6   if (ssc.sc.isLocal) { // 如果在 local 模式下運行
 7     val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId
 8     Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId))
 9   } else { // 在 yarn 模式下,過濾掉 driver 的 executor
10     ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) =>
11       blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location
12     }.map { case (blockManagerId, _) =>
13       ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)
14     }.toSeq
15   }
16 }

org.apache.spark.streaming.scheduler.ReceiverSchedulingPolicy#scheduleReceivers的解釋如下:

1 Try our best to schedule receivers with evenly distributed. However, if the preferredLocations of receivers are not even, we may not be able to schedule them evenly because we have to respect them. Here is the approach to schedule executors:
2 First, schedule all the receivers with preferred locations (hosts), evenly among the executors running on those host.
3 Then, schedule all other receivers evenly among all the executors such that overall distribution over all the receivers is even.
4 This method is called when we start to launch receivers at the first time.
5 該方法就是確保receiver 能夠在worker node 上均勻分布的。遵循以下兩個原則:
6 1.使用 preferred location 分配 receiver 到這些node 上
7 2.將其他的未分配的receiver均勻分布均勻分布到 每一個 worker node 上 

org.apache.spark.streaming.scheduler.ReceiverTracker#updateReceiverScheduledExecutors 負責更新receiverid 和 receiver info 的映射關系,源碼如下:

 1 private def updateReceiverScheduledExecutors(
 2     receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = {
 3   val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match {
 4     case Some(oldInfo) =>
 5       oldInfo.copy(state = ReceiverState.SCHEDULED,
 6         scheduledLocations = Some(scheduledLocations))
 7     case None =>
 8       ReceiverTrackingInfo(
 9         receiverId,
10         ReceiverState.SCHEDULED,
11         Some(scheduledLocations),
12         runningExecutor = None)
13   }
14   receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo)
15 }

 

Driver 發送分布式啟動receiver job

startReceiver 負責啟動 receiver,源碼如下:

 1 /**
 2  * Start a receiver along with its scheduled executors
 3  */
 4 private def startReceiver(
 5     receiver: Receiver[_],
 6     scheduledLocations: Seq[TaskLocation]): Unit = {
 7   def shouldStartReceiver: Boolean = {
 8     // It's okay to start when trackerState is Initialized or Started
 9     !(isTrackerStopping || isTrackerStopped)
10   }
11 
12   val receiverId = receiver.streamId
13   if (!shouldStartReceiver) {
14     onReceiverJobFinish(receiverId)
15     return
16   }
17 
18   val checkpointDirOption = Option(ssc.checkpointDir)
19   val serializableHadoopConf =
20     new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
21 
22 // 在 worker node 上啟動 receiver 的方法
23   val startReceiverFunc: Iterator[Receiver[_]] => Unit =
24     (iterator: Iterator[Receiver[_]]) => {
25       if (!iterator.hasNext) {
26         throw new SparkException(
27           "Could not start receiver as object not found.")
28       }
29       if (TaskContext.get().attemptNumber() == 0) {
30         val receiver = iterator.next()
31         assert(iterator.hasNext == false)
32         val supervisor = new ReceiverSupervisorImpl(
33           receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
34         supervisor.start()
35         supervisor.awaitTermination()
36       } else {
37         // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
38       }
39     }
40 
41   // Create the RDD using the scheduledLocations to run the receiver in a Spark job
42   val receiverRDD: RDD[Receiver[_]] =
43     if (scheduledLocations.isEmpty) {
44       ssc.sc.makeRDD(Seq(receiver), 1)
45     } else {
46       val preferredLocations = scheduledLocations.map(_.toString).distinct
47       ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
48     }
49   receiverRDD.setName(s"Receiver $receiverId")
50   ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
51   ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
52   // 提交分布式receiver 啟動任務
53   val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
54     receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
55   // We will keep restarting the receiver job until ReceiverTracker is stopped
56   future.onComplete {
57     case Success(_) =>
58       if (!shouldStartReceiver) {
59         onReceiverJobFinish(receiverId)
60       } else {
61         logInfo(s"Restarting Receiver $receiverId")
62         self.send(RestartReceiver(receiver))
63       }
64     case Failure(e) =>
65       if (!shouldStartReceiver) {
66         onReceiverJobFinish(receiverId)
67       } else {
68         logError("Receiver has been stopped. Try to restart it.", e)
69         logInfo(s"Restarting Receiver $receiverId")
70         self.send(RestartReceiver(receiver))
71       }
72   }(submitJobThreadPool)
73   logInfo(s"Receiver ${receiver.streamId} started")
74 }

Worker節點啟動 receiver監管服務

org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#ReceiverSupervisorImpl 的 start 方法如下:

 1 /** Start the supervisor */
 2 def start() {
 3   onStart()
 4   startReceiver()
 5 }
 6 override protected def onStart() { // 啟動 BlockGenerator 服務
 7   registeredBlockGenerators.foreach { _.start() }
 8 }
 9 // startReceiver 方法如下:
10 /** Start receiver */
11 def startReceiver(): Unit = synchronized {
12   try {
13     if (onReceiverStart()) { // 注冊receiver 成功
14       logInfo("Starting receiver")
15       receiverState = Started
16       receiver.onStart() // 啟動 receiver
17       logInfo("Called receiver onStart")
18     } else {
19       // The driver refused us
20       stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
21     }
22   } catch {
23     case NonFatal(t) =>
24       stop("Error starting receiver " + streamId, Some(t))
25   }
26 }

 

注冊 receiver 到 driver節點

1 override protected def onReceiverStart(): Boolean = {
2   val msg = RegisterReceiver(
3     streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
4   trackerEndpoint.askWithRetry[Boolean](msg)
5 }

 

簡單描述一下driver 端做的事情,主要負責將其納入到org.apache.spark.streaming.scheduler.ReceiverTracker 的管理中來,具體streamid 和 ReceiverTrackingInfo 的映射關系保存在receiverTrackingInfos中。

org.apache.spark.streaming.scheduler.ReceiverTracker#registerReceiver關鍵代碼如下:

 1 val name = s"${typ}-${streamId}"
 2 val receiverTrackingInfo = ReceiverTrackingInfo(
 3   streamId,
 4   ReceiverState.ACTIVE,
 5   scheduledLocations = None,
 6   runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
 7   name = Some(name),
 8   endpoint = Some(receiverEndpoint))
 9 receiverTrackingInfos.put(streamId, receiverTrackingInfo)
10 listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))

 

啟動 receiver 線程

由於我們啟用了 WAL, 所以 這里的receiver 是ReliableKafkaReceiver 的實例
receiver.onStart 即 org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart, 源碼如下:

 1 override def onStart(): Unit = {
 2   logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
 3 
 4   // Initialize the topic-partition / offset hash map.
 5 // 1. 負責維護消費的 topic-partition 和 offset 的映射關系
 6   topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
 7 
 8   // Initialize the stream block id / offset snapshot hash map.
 9 // 2. 負責維護 block-id 和 partition-offset 之間的映射關系
10   blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
11 
12   // Initialize the block generator for storing Kafka message.
13 // 3. 負責保存 kafka message 的 block generator,入參是GeneratedBlockHandler 實例,這是一個負責監聽 block generator事件的一個監聽器
14 // Generates batches of objects received by a org.apache.spark.streaming.receiver.Receiver and puts them into appropriately named blocks at regular intervals. This class starts two threads, one to periodically start a new batch and prepare the previous batch of as a block, the other to push the blocks into the block manager. 
15   blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
16   // 4. 關閉consumer 自動提交 offset 選項
17 // auto_offset_commit 應該是 false
18   if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
19     logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
20       "otherwise we will manually set it to false to turn off auto offset commit in Kafka")
21   }
22 
23   val props = new Properties()
24   kafkaParams.foreach(param => props.put(param._1, param._2))
25   // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
26   // we have to make sure this property is set to false to turn off auto commit mechanism in Kafka.
27   props.setProperty(AUTO_OFFSET_COMMIT, "false")
28 
29   val consumerConfig = new ConsumerConfig(props)
30 
31   assert(!consumerConfig.autoCommitEnable)
32 
33   logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
34 // 5. 初始化 consumer 對象
35 // consumerConnector 是ZookeeperConsumerConnector的實例
36   consumerConnector = Consumer.create(consumerConfig)
37   logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
38   // 6. 初始化zookeeper 的客戶端
39   zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
40     consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
41    // 7. 創建線程池來處理消息流,池的大小是固定的,為partition 的總數,並指定線程池中每一個線程的name 的前綴,內部使用ThreadPoolExecutor,並且 創建線程的 factory類是guava 工具包提供的。
42   messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
43     topics.values.sum, "KafkaMessageHandler")
44    // 8. 啟動 BlockGenerator內的兩個線程
45   blockGenerator.start()
46 
47 // 9. 創建MessageStream對象
48   val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
49     .newInstance(consumerConfig.props)
50     .asInstanceOf[Decoder[K]]
51 
52   val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
53     .newInstance(consumerConfig.props)
54     .asInstanceOf[Decoder[V]]
55  
56   val topicMessageStreams = consumerConnector.createMessageStreams(
57     topics, keyDecoder, valueDecoder)
58 // 10. 將待處理的MessageHandler 放入 線程池中,等待執行
59   topicMessageStreams.values.foreach { streams =>
60     streams.foreach { stream =>
61       messageHandlerThreadPool.submit(new MessageHandler(stream))
62     }
63   }
64 }

其中, 第9 步,創建MessageStream對象,
kafka.consumer.ZookeeperConsumerConnector#createMessageStreams 方法如下:

1 def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
2     : Map[String, List[KafkaStream[K,V]]] = {
3   if (messageStreamCreated.getAndSet(true))
4     throw new MessageStreamsExistException(this.getClass.getSimpleName +
5                                  " can create message streams at most once",null)
6   consume(topicCountMap, keyDecoder, valueDecoder)
7 }

 

其調用了 consume 方法,源碼如下:

def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
    : Map[String,List[KafkaStream[K,V]]] = {
  debug("entering consume ")
  if (topicCountMap == null)
    throw new RuntimeException("topicCountMap is null")
 // 1. 初始化 topicCount
  val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
 // 2. 獲取 每一個topic 和 threadId 集合的映射關系
  val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic

  // make a list of (queue,stream) pairs, one pair for each threadId
// 3. 得到每一個 threadId 對應 (queue, stream) 的映射列表
  val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
    threadIdSet.map(_ => {
      val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
      val stream = new KafkaStream[K,V](
        queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
      (queue, stream)
    })
  ).flatten.toList
 // 4. 獲取 groupId 在 zookeeper 中的path
  val dirs = new ZKGroupDirs(config.groupId)
// 5. 注冊 consumer 到 groupId(在zk中)
  registerConsumerInZK(dirs, consumerIdString, topicCount)
// 6. 重新初始化 consumer
  reinitializeConsumer(topicCount, queuesAndStreams)
  // 7. 返回流 
  loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
}

 

consumer消費kafka數據

在 kafka.consumer.ZookeeperConsumerConnector#consume方法中,有如下操作:

 1 // 得到每一個 threadId 對應 (queue, stream) 的映射列表
 2   val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
 3     threadIdSet.map(_ => {
 4       val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
 5       val stream = new KafkaStream[K,V](
 6         queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
 7       (queue, stream)
 8     })
 9   ).flatten.toList
10  // 獲取 groupId 在 zookeeper 中的path
11   val dirs = new ZKGroupDirs(config.groupId)
12 // 注冊 consumer 到 groupId(在zk中)
13   registerConsumerInZK(dirs, consumerIdString, topicCount)
14 // 重新初始化 consumer
15   reinitializeConsumer(topicCount, queuesAndStreams)

在上面的代碼中,可以看到初始化的queue(LinkedBlockingQueue實例)除了被傳入stream(KafkaStream)的構造函數被迭代器從中取數據,還和 stream 重組成Tuple2[LinkedBlockingQueue[FetchedDataChunk]的list,之后被傳入reinitializeConsumer 方法中。
kafka.consumer.ZookeeperConsumerConnector#reinitializeConsume 其源碼如下:

 1 private def reinitializeConsumer[K,V](
 2     topicCount: TopicCount,
 3     queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
 4  // 1. 獲取 該groupid 在 zk 中的路徑
 5   val dirs = new ZKGroupDirs(config.groupId)
 6 
 7   // listener to consumer and partition changes
 8 // 2. 初始化loadBalancerListener,這個負載均衡listener 會時刻監控 consumer 和 partition 的變化
 9   if (loadBalancerListener == null) {
10     val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
11     loadBalancerListener = new ZKRebalancerListener(
12       config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
13   }
14 
15   // create listener for session expired event if not exist yet
16   // 3. 監控 session 過期的listner, 有新session注冊初始化,會通知 loadBalancer
17 if (sessionExpirationListener == null)
18     sessionExpirationListener = new ZKSessionExpireListener(
19       dirs, consumerIdString, topicCount, loadBalancerListener)
20 
21   // create listener for topic partition change event if not exist yet
22 // 4. 初始化ZKTopicPartitionChangeListener實例,當topic partition 變化時,這個listener會通知 loadBalancer
23   if (topicPartitionChangeListener == null)
24     topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)
25  // 5. 將queuesAndStreams 的值經過一系列轉換,並添加到loadBalancerListener.kafkaMessageAndMetadataStreams 中
26   val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
27 
28   // map of {topic -> Set(thread-1, thread-2, ...)}
29   val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] =
30     topicCount.getConsumerThreadIdsPerTopic
31 
32   val allQueuesAndStreams = topicCount match {
33     case wildTopicCount: WildcardTopicCount => // 這里是WildcardTopicCount,走這個分支
34       /*
35        * Wild-card consumption streams share the same queues, so we need to
36        * duplicate the list for the subsequent zip operation.
37        */
38       (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList
39     case statTopicCount: StaticTopicCount =>
40       queuesAndStreams
41   }
42 
43   val topicThreadIds = consumerThreadIdsPerTopic.map {
44     case(topic, threadIds) =>
45       threadIds.map((topic, _))
46   }.flatten
47 
48   require(topicThreadIds.size == allQueuesAndStreams.size,
49     "Mismatch between thread ID count (%d) and queue count (%d)"
50     .format(topicThreadIds.size, allQueuesAndStreams.size))
51   val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams)
52 
53   threadQueueStreamPairs.foreach(e => {
54     val topicThreadId = e._1
55     val q = e._2._1
56     topicThreadIdAndQueues.put(topicThreadId, q)
57     debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
58     newGauge(
59       "FetchQueueSize",
60       new Gauge[Int] {
61         def value = q.size
62       },
63       Map("clientId" -> config.clientId,
64         "topic" -> topicThreadId._1,
65         "threadId" -> topicThreadId._2.threadId.toString)
66     )
67   })
68 
69   val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
70   groupedByTopic.foreach(e => {
71     val topic = e._1
72     val streams = e._2.map(_._2._2).toList
73     topicStreamsMap += (topic -> streams)
74     debug("adding topic %s and %d streams to map.".format(topic, streams.size))
75   })
76 
77   // listener to consumer and partition changes
78 // 6. 使用 zkClient 注冊sessionExpirationListener 實例
79   zkClient.subscribeStateChanges(sessionExpirationListener)
80  // 7. 使用 zkClient 注冊loadBalancerListener 實例
81   zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
82  // 遍歷每一個topic,使用zkClient 注冊topicPartitionChangeListener 實例
83   topicStreamsMap.foreach { topicAndStreams =>
84     // register on broker partition path changes
85     val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
86     zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
87   }
88 
89   // explicitly trigger load balancing for this consumer
90 // 8. 使用 loadBalancerListener 同步做負載均衡
91   loadBalancerListener.syncedRebalance()
92 }

重點看 第 8 步,使用 loadBalancerListener 同步做負載均衡。
kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#syncedRebalance 源碼如下:

 1 def syncedRebalance() {
 2   rebalanceLock synchronized {
 3     rebalanceTimer.time {
 4       if(isShuttingDown.get())  { // 如果ZookeeperConsumerConnector
 5 已經shutdown了,直接返回
 6         return
 7       } else {
 8         for (i <- 0 until config.rebalanceMaxRetries) { // 默認是 4 次
 9           info("begin rebalancing consumer " + consumerIdString + " try #" + i)
10           var done = false
11           var cluster: Cluster = null
12           try {
13             // 1. 根據zkClient 實例 獲取並創建Cluster 對象,這個 cluster 實例包含了一個 Broker(broker的id,broker在zk中的路徑) 列表
14             cluster = getCluster(zkClient) 
15             // 2. 在cluster中做 rebalance操作
16             done = rebalance(cluster)
17           } catch {
18             case e: Throwable =>
19               /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
20                 * For example, a ZK node can disappear between the time we get all children and the time we try to get
21                 * the value of a child. Just let this go since another rebalance will be triggered.
22                 **/
23               info("exception during rebalance ", e)
24           }
25           info("end rebalancing consumer " + consumerIdString + " try #" + i)
26           if (done) {
27             return
28           } else {
29             /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
30              * clear the cache */
31             info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
32           }
33           // stop all fetchers and clear all the queues to avoid data duplication
34           closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
35           Thread.sleep(config.rebalanceBackoffMs)
36         }
37       }
38     }
39   }
40 
41   throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries")
42 }

 

重點看 第2 步,在 cluster 中做 rebalance 操作,kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#rebalance 源碼如下:

 1 private def rebalance(cluster: Cluster): Boolean = {
 2   // 1. 獲取 group和 threadId 的Map 映射關系
 3   val myTopicThreadIdsMap = TopicCount.constructTopicCount(
 4     group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
 5   // 2. 獲取kafka cluster 中所有可用的node
 6   val brokers = getAllBrokersInCluster(zkClient)
 7   if (brokers.size == 0) { // 如果可用節點為空,設置listener訂閱,返回 true
 8     // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
 9     // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers
10     // are up.
11     warn("no brokers found when trying to rebalance.")
12     zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener)
13     true
14   }
15   else {
16     /**
17      * fetchers must be stopped to avoid data duplication, since if the current
18      * rebalancing attempt fails, the partitions that are released could be owned by another consumer.
19      * But if we don't stop the fetchers first, this consumer would continue returning data for released
20      * partitions in parallel. So, not stopping the fetchers leads to duplicate data.
21      */
22    // 3. 做rebalance 之前的准備工作
23    // 3.1. 關閉現有 fetcher 連接
24     closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
25    // 3.2 釋放 partition 的所有權(主要是刪除zk下的owner 節點的數據以及解除內存中的topic和 fetcher的關聯關系)
26     releasePartitionOwnership(topicRegistry)
27    // 3.3. 重新給partition分配 fetcher
28     val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient)
29     val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext)
30     val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
31       valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))
32 
33     // fetch current offsets for all topic-partitions
34     // 3.4 獲取當前fetcher對應的 partitions 的 offsets,這里的offset是指 consumer 下一個要消費的offset
35     val topicPartitions = partitionOwnershipDecision.keySet.toSeq
36 
37     val offsetFetchResponseOpt = fetchOffsets(topicPartitions)
38 
39     if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined)
40       false
41     else {
42       // 3.5 更新 partition 和 fetcher 的對應關系
43       val offsetFetchResponse = offsetFetchResponseOpt.get
44       topicPartitions.foreach(topicAndPartition => {
45         val (topic, partition) = topicAndPartition.asTuple
46 // requestInfo是OffsetFetchResponse實例中的成員變量,它是一個Map[TopicAndPartition, OffsetMetadataAndError]實例
47         val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
48         val threadId = partitionOwnershipDecision(topicAndPartition)
49         addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
50       })
51 
52       /**
53        * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
54        * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
55        */
56       if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) {
57         allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size
58 
59         partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
60                                   .foreach { case (topic, partitionThreadPairs) =>
61           newGauge("OwnedPartitionsCount",
62             new Gauge[Int] {
63               def value() = partitionThreadPairs.size
64             },
65             ownedPartitionsCountMetricTags(topic))
66         }
67         // 3.6 將已經新的 topic registry 覆蓋舊的
68         topicRegistry = currentTopicRegistry
69 // 4. 更新 fetcher
70         updateFetcher(cluster)
71         true
72       } else {
73         false
74       }
75     }
76   }
77 }

其中addPartitionTopicInfo 源碼如下:

 1 private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
 2                                     partition: Int, topic: String,
 3                                     offset: Long, consumerThreadId: ConsumerThreadId) {
 4 //如果map沒有對應的 key,會使用valueFactory初始化鍵值對,並返回 對應的 value
 5     val partTopicInfoMap = currentTopicRegistry.getAndMaybePut(topic)
 6 
 7     val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
 8     val consumedOffset = new AtomicLong(offset)
 9     val fetchedOffset = new AtomicLong(offset)
10     val partTopicInfo = new PartitionTopicInfo(topic,
11                                                partition,
12                                                queue,
13                                                consumedOffset,
14                                                fetchedOffset,
15                                                new AtomicInteger(config.fetchMessageMaxBytes),
16                                                config.clientId)
17     // 1. 將其注冊到新的 Topic注冊中心中,即注冊 partition 和 fetcher 的關系
18 partTopicInfoMap.put(partition, partTopicInfo)
19     debug(partTopicInfo + " selected new offset " + offset)
20 // 2. 更新consumer 的 已經消費的offset信息
21     checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset)
22   }
23 }

 

第4步, 更新 fetcher 源碼如下:

 1 private def updateFetcher(cluster: Cluster) {
 2   // update partitions for fetcher
 3   var allPartitionInfos : List[PartitionTopicInfo] = Nil
 4   for (partitionInfos <- topicRegistry.values)
 5     for (partition <- partitionInfos.values)
 6       allPartitionInfos ::= partition
 7   info("Consumer " + consumerIdString + " selected partitions : " +
 8     allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(","))
 9 
10   fetcher match {
11     case Some(f) =>
12       f.startConnections(allPartitionInfos, cluster)
13     case None =>
14   }
15 }

 

其中,f.startConnections方法真正執行 更新操作。此時引入一個新的類。即 fetcher 類,kafka.consumer.ConsumerFetcherManager。

kafka.consumer.ConsumerFetcherManager#startConnections 的源碼如下:

 1 def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
 2 // LeaderFinderThread 在 topic 的leader node可用時,將 fetcher 添加到 leader 節點上
 3   leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread")
 4   leaderFinderThread.start()
 5 
 6   inLock(lock) {
 7 // 更新ConsumerFetcherManager 成員變量
 8     partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap
 9     this.cluster = cluster
10     noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId))
11     cond.signalAll()
12   }
13 }

 

ConsumerFetcherManager 有一個LeaderFinderThread 實例,該類的父類kafka.utils.ShutdownableThread ,run 方法如下:

 1 override def run(): Unit = {
 2   info("Starting ")
 3   try{
 4     while(isRunning.get()){
 5       doWork()
 6     }
 7   } catch{
 8     case e: Throwable =>
 9       if(isRunning.get())
10         error("Error due to ", e)
11   }
12   shutdownLatch.countDown()
13   info("Stopped ")
14 }

doWork其實就是一個抽象方法,其子類LeaderFinderThread的實現如下:

 1 // thread responsible for adding the fetcher to the right broker when leader is available
 2 override def doWork() {
 3 // 1. 獲取 partition 和leader node的映射關系
 4   val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
 5   lock.lock()
 6   try {
 7     while (noLeaderPartitionSet.isEmpty) { // 這個字段在startConnections 已更新新值
 8       trace("No partition for leader election.")
 9       cond.await()
10     }
11 
12     trace("Partitions without leader %s".format(noLeaderPartitionSet))
13     val brokers = getAllBrokersInCluster(zkClient) // 獲取所有可用broker 節點
14     // 獲取kafka.api.TopicMetadata 序列,kafka.api.TopicMetadata 保存了 topic 和 partitionId,isr,leader,replicas 的信息
15 val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
16                                                         brokers,
17                                                         config.clientId,
18                                                         config.socketTimeoutMs,
19                                                         correlationId.getAndIncrement).topicsMetadata
20     if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString()))
21 // 2. 根據獲取到的 partition 和 leader node 的關系更新noLeaderPartitionSet 和leaderForPartitionsMap 兩個map集合,其中noLeaderPartitionSet 包含的是沒有確定leader 的 partition 集合,leaderForPartitionsMap 是 已經確定了 leader 的 partition 集合。
22     topicsMetadata.foreach { tmd =>
23       val topic = tmd.topic
24       tmd.partitionsMetadata.foreach { pmd =>
25         val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
26         if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
27           val leaderBroker = pmd.leader.get
28           leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
29           noLeaderPartitionSet -= topicAndPartition
30         }
31       }
32     }
33   } catch {
34     case t: Throwable => {
35         if (!isRunning.get())
36           throw t /* If this thread is stopped, propagate this exception to kill the thread. */
37         else
38           warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
39       }
40   } finally {
41     lock.unlock()
42   }
43 
44   try {
45 // 3. 具體為 partition 分配 fetcher
46     addFetcherForPartitions(leaderForPartitionsMap.map{
47       case (topicAndPartition, broker) =>
48         topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())}
49     )
50   } catch {
51     case t: Throwable => {
52       if (!isRunning.get())
53         throw t /* If this thread is stopped, propagate this exception to kill the thread. */
54       else {
55         warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t)
56         lock.lock()
57         noLeaderPartitionSet ++= leaderForPartitionsMap.keySet
58         lock.unlock()
59       }
60     }
61   }
62   // 4. 關閉空閑fetcher線程
63   shutdownIdleFetcherThreads()
64   Thread.sleep(config.refreshLeaderBackoffMs)
65 }

 

重點看第3 步,具體為 partition 分配 fetcher,addFetcherForPartitions 源碼如下:

 1 def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) {
 2   mapLock synchronized {
 3 // 獲取 fetcher 和 partition的映射關系
 4     val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
 5       BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
 6     for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
 7 
 8       var fetcherThread: AbstractFetcherThread = null
 9       fetcherThreadMap.get(brokerAndFetcherId) match {
10         case Some(f) => fetcherThread = f
11         case None =>
12 // 根據brokerAndFetcherId 去初始化Fetcher並啟動 fetcher
13           fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
14           fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
15           fetcherThread.start
16       }
17 
18       fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) =>
19         topicAndPartition -> brokerAndInitOffset.initOffset
20       })
21     }
22   }
23 
24   info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) =>
25     "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
26 }

 

kafka.consumer.ConsumerFetcherManager#createFetcherThread的源碼如下:

1 override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
2   new ConsumerFetcherThread(
3     "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
4     config, sourceBroker, partitionMap, this)
5 }

 

先來看ConsumerFetcherThread的構造方法聲明:

 1 class ConsumerFetcherThread(name: String,
 2                             val config: ConsumerConfig,
 3                             sourceBroker: Broker,
 4                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
 5                             val consumerFetcherManager: ConsumerFetcherManager)
 6         extends AbstractFetcherThread(name = name, 
 7                                       clientId = config.clientId,
 8                                       sourceBroker = sourceBroker,
 9                                       socketTimeout = config.socketTimeoutMs,
10                                       socketBufferSize = config.socketReceiveBufferBytes,
11                                       fetchSize = config.fetchMessageMaxBytes,
12                                       fetcherBrokerId = Request.OrdinaryConsumerId,
13                                       maxWait = config.fetchWaitMaxMs,
14                                       minBytes = config.fetchMinBytes,
15                                       isInterruptible = true)

注意,partitionMap 中的value 是PartitionTopicInfo ,這個對象中封裝了存放fetch結果值的BlockingQueue[FetchedDataChunk] 實例。
再來看 run 方法,其使用的是 kafka.utils.ShutdownableThread#run 方法,上面我們已經看過了,主要看該子類是如何重新 doWork方法的:

 1 override def doWork() {
 2   inLock(partitionMapLock) { // 加鎖,執行,釋放鎖
 3     if (partitionMap.isEmpty) // 如果沒有需要執行的 fetch 操作,等待200ms后返回
 4       partitionMapCond.await(200L, TimeUnit.MILLISECONDS)
 5     partitionMap.foreach { // 將所有的 fetch 的信息添加到fetchRequestBuilder中
 6       case((topicAndPartition, offset)) =>
 7         fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
 8                          offset, fetchSize)
 9     }
10   }
11   // 構建批抓取的fetchRequest對象
12   val fetchRequest = fetchRequestBuilder.build()
13 // 處理 FetchRequest
14   if (!fetchRequest.requestInfo.isEmpty)
15     processFetchRequest(fetchRequest)
16 }

 

其中 kafka.server.AbstractFetcherThread#processFetchRequest 源碼如下:

 1 private def processFetchRequest(fetchRequest: FetchRequest) {
 2   val partitionsWithError = new mutable.HashSet[TopicAndPartition]
 3   var response: FetchResponse = null
 4   try {
 5     trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
 6 // 發送請求,並獲取返回值。
 7 // simpleConsumer  就是SimpleConsumer 實例,已作說明,不再贅述。
 8     response = simpleConsumer.fetch(fetchRequest)
 9   } catch {
10     case t: Throwable =>
11       if (isRunning.get) {
12         warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
13         partitionMapLock synchronized {
14           partitionsWithError ++= partitionMap.keys
15         }
16       }
17   }
18   fetcherStats.requestRate.mark()
19 
20   if (response != null) {
21     // process fetched data
22     inLock(partitionMapLock) { // 獲取鎖,執行處理response 操作,釋放鎖
23       response.data.foreach {
24         case(topicAndPartition, partitionData) =>
25           val (topic, partitionId) = topicAndPartition.asTuple
26           val currentOffset = partitionMap.get(topicAndPartition)
27           // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
28           if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {
29             partitionData.error match { // 根據返回碼來確定具體執行哪部分處理邏輯
30               case ErrorMapping.NoError => // 成功返回,沒有錯誤
31                 try {
32                   val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
33                   val validBytes = messages.validBytes
34                   val newOffset = messages.shallowIterator.toSeq.lastOption match {
35                     case Some(m: MessageAndOffset) => m.nextOffset
36                     case None => currentOffset.get
37                   }
38                   partitionMap.put(topicAndPartition, newOffset)
39                   fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
40                   fetcherStats.byteRate.mark(validBytes)
41                   // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
42                   processPartitionData(topicAndPartition, currentOffset.get, partitionData)
43                 } catch {
44                   case ime: InvalidMessageException => // 消息獲取不完整
45                     // we log the error and continue. This ensures two things
46                     // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
47                     // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
48                     //    should get fixed in the subsequent fetches
49                     logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
50                   case e: Throwable =>
51                     throw new KafkaException("error processing data for partition [%s,%d] offset %d"
52                                              .format(topic, partitionId, currentOffset.get), e)
53                 }
54               case ErrorMapping.OffsetOutOfRangeCode => // offset out of range error
55                 try {
56                   val newOffset = handleOffsetOutOfRange(topicAndPartition)
57                   partitionMap.put(topicAndPartition, newOffset)
58                   error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
59                     .format(currentOffset.get, topic, partitionId, newOffset))
60                 } catch {
61                   case e: Throwable =>
62                     error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
63                     partitionsWithError += topicAndPartition
64                 }
65               case _ =>
66                 if (isRunning.get) {
67                   error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
68                     ErrorMapping.exceptionFor(partitionData.error).getClass))
69                   partitionsWithError += topicAndPartition
70                 }
71             }
72           }
73       }
74     }
75   }
76 
77   if(partitionsWithError.size > 0) {
78     debug("handling partitions with error for %s".format(partitionsWithError))
79     handlePartitionsWithErrors(partitionsWithError)
80   }
81 }

 

其中processPartitionData 源碼如下,它負責處理具體的返回消息:

 1 // process fetched data
 2 def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
 3 // partitionMap 是一個成員變量,在構造函數中作為入參
 4   val pti = partitionMap(topicAndPartition)
 5   if (pti.getFetchOffset != fetchOffset)
 6     throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d"
 7                               .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))
 8 // 數據入隊
 9   pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
10 }

可以看到,終於在這里,把從leader中fetch的消息放入了BlockingQueue[FetchedDataChunk] 緩沖堵塞隊列中。

KafkaStream從queue中堵塞式獲取數據

KafkaStream 是依賴於 LinkedBlockingQueue 的同理 KafkaStream 也會返回一個迭代器 kafka.consumer.ConsumerIterator,用於迭代訪問 KafkaStream 中的數據。
kafka.consumer.ConsumerIterator 的主要源碼如下:

 1 // 判斷是否有下一個元素
 2 def hasNext(): Boolean = {
 3   if(state == FAILED)
 4     throw new IllegalStateException("Iterator is in failed state")
 5   state match {
 6     case DONE => false
 7     case READY => true
 8     case _ => maybeComputeNext()
 9   }
10 }
11 // 獲取下一個元素,父類實現
12 def next(): T = {
13   if(!hasNext())
14     throw new NoSuchElementException()
15   state = NOT_READY
16   if(nextItem == null)
17     throw new IllegalStateException("Expected item but none found.")
18   nextItem
19 }
20 // 獲取下一個元素,使用子類ConsumerIterator實現
21 override def next(): MessageAndMetadata[K, V] = {
22   val item = super.next() // 調用父類實現
23   if(consumedOffset < 0)
24     throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
25   currentTopicInfo.resetConsumeOffset(consumedOffset)
26   val topic = currentTopicInfo.topic
27   trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
28   consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
29   consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
30   item
31 }
32  // 或許有,嘗試計算一下下一個
33 def maybeComputeNext(): Boolean = {
34   state = FAILED
35   nextItem = makeNext()
36   if(state == DONE) {
37     false
38   } else {
39     state = READY
40     true
41   }
42 }
43 // 創建下一個元素,這個在子類ConsumerIterator中有實現
44 protected def makeNext(): MessageAndMetadata[K, V] = {
45 // 首先channel 是 LinkedBlockingQueue實例, 是 KafkaStream 中的 queue 成員變量,queue 成員變量
46   var currentDataChunk: FetchedDataChunk = null
47   // if we don't have an iterator, get one
48   var localCurrent = current.get() 
49 // 如果沒有迭代器或者是沒有下一個元素了,需要從channel中取一個
50   if(localCurrent == null || !localCurrent.hasNext) {
51 // 刪除並返回隊列的頭節點。
52     if (consumerTimeoutMs < 0)
53       currentDataChunk = channel.take // 阻塞方法,一直等待,直到有可用元素
54     else {
55       currentDataChunk = channel.poll(consumerTimeoutMs,  TimeUnit.MILLISECONDS) // 阻塞方法,等待指定時間,超時也會返回
56       if (currentDataChunk == null) { // 如果沒有數據,重置狀態為NOT_READY
57         // reset state to make the iterator re-iterable
58         resetState()
59         throw new ConsumerTimeoutException
60       }
61     }
62 // 關閉命令
63     if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
64       debug("Received the shutdown command")
65       return allDone // 該函數將狀態設為DONE, 返回null
66     } else {
67       currentTopicInfo = currentDataChunk.topicInfo
68       val cdcFetchOffset = currentDataChunk.fetchOffset
69       val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
70       if (ctiConsumeOffset < cdcFetchOffset) {
71         error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
72           .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
73         currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
74       }
75       localCurrent = currentDataChunk.messages.iterator
76 
77       current.set(localCurrent)
78     }
79     // if we just updated the current chunk and it is empty that means the fetch size is too small!
80     if(currentDataChunk.messages.validBytes == 0)
81       throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
82                                              "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
83                                              .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
84   }
85   var item = localCurrent.next()
86   // reject the messages that have already been consumed
87   while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) {
88     item = localCurrent.next()
89   }
90   consumedOffset = item.nextOffset
91 
92   item.message.ensureValid() // validate checksum of message to ensure it is valid
93  // 返回處理封裝好的 kafka 數據
94   new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder)
95 }

 

消費到的數據cache 到WAL中

我們再來看,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart 的第10 步相應的代碼:

1 // 10. 將待處理的MessageHandler 放入 線程池中,等待執行
2   topicMessageStreams.values.foreach { streams =>
3     streams.foreach { stream =>
4       messageHandlerThreadPool.submit(new MessageHandler(stream))
5     }
6   }

其中 MessageHandler 是一個 Runnable 對象,其 run 方法如下:

 1 override def run(): Unit = {
 2   while (!isStopped) {
 3     try {
 4 // 1. 獲取ConsumerIterator 迭代器對象
 5       val streamIterator = stream.iterator()
 6       // 2. 遍歷迭代器中獲取每一條數據,並且保存message和相應的 metadata 信息
 7 while (streamIterator.hasNext) {
 8         storeMessageAndMetadata(streamIterator.next)
 9       }
10     } catch {
11       case e: Exception =>
12         reportError("Error handling message", e)
13     }
14   }
15 }

 

其中第二步中關鍵方法,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#storeMessageAndMetadata 方法如下:

1 /** Store a Kafka message and the associated metadata as a tuple. */
2 private def storeMessageAndMetadata(
3     msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
4   val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
5   val data = (msgAndMetadata.key, msgAndMetadata.message)
6   val metadata = (topicAndPartition, msgAndMetadata.offset)
7 // 添加數據到 block
8   blockGenerator.addDataWithCallback(data, metadata)
9 }

addDataWithCallback 源碼如下:

 1 /**
 2  * Push a single data item into the buffer. After buffering the data, the
 3  * `BlockGeneratorListener.onAddData` callback will be called.
 4  */
 5 def addDataWithCallback(data: Any, metadata: Any): Unit = {
 6   if (state == Active) {
 7     waitToPush()
 8     synchronized {
 9       if (state == Active) {
10 // 1. 將數據放入 buffer 中,以便處理線程從中獲取數據
11         currentBuffer += data
12 // 2. 在啟動 receiver線程中,可以知道listener 是指GeneratedBlockHandler 實例
13         listener.onAddData(data, metadata)
14       } else {
15         throw new SparkException(
16           "Cannot add data as BlockGenerator has not been started or has been stopped")
17       }
18     }
19   } else {
20     throw new SparkException(
21       "Cannot add data as BlockGenerator has not been started or has been stopped")
22   }
23 }

 

第二步比較簡單,先看一下第二步:
org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler#onAddData的源碼如下:

 1 def onAddData(data: Any, metadata: Any): Unit = {
 2   // Update the offset of the data that was added to the generator
 3   if (metadata != null) {
 4     val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
 5     updateOffset(topicAndPartition, offset)
 6   }
 7 }
 8 // 這里的 updateOffset 調用的是//org.apache.spark.streaming.kafka.ReliableKafkaReceiver#updateOffset,源碼如下:
 9 /** Update stored offset */
10 private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
11   topicPartitionOffsetMap.put(topicAndPartition, offset)
12 }

 

第一步的原理如下:
在 BlockGenerator中有一個定時器,定時(200ms)去執行檢查currentBuffer是否為empty任務, 若不為空,則執行如下操作並把它放入等待生成block 的隊列中,有兩外一個線程來時刻監聽這個隊列,有數據,則執行pushBlock 操作。
第一個定時器線程如下:

 1 private val blockIntervalTimer =
 2   new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
 3 
 4 // 其中,updateCurrentBuffer 方法如下
 5 /** Change the buffer to which single records are added to. */
 6 private def updateCurrentBuffer(time: Long): Unit = {
 7   try {
 8     var newBlock: Block = null
 9     synchronized {
10       if (currentBuffer.nonEmpty) {
11         val newBlockBuffer = currentBuffer
12         currentBuffer = new ArrayBuffer[Any]
13         val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
14         listener.onGenerateBlock(blockId)
15         newBlock = new Block(blockId, newBlockBuffer)
16       }
17     }
18 
19     if (newBlock != null) {
20       blocksForPushing.put(newBlock)  // put is blocking when queue is full
21     }
22   } catch {
23     case ie: InterruptedException =>
24       logInfo("Block updating timer thread was interrupted")
25     case e: Exception =>
26       reportError("Error in block updating thread", e)
27   }
28 }
29 
30 // listener.onGenerateBlock(blockId) 代碼如下:
31 def onGenerateBlock(blockId: StreamBlockId): Unit = {
32   // Remember the offsets of topics/partitions when a block has been generated
33   rememberBlockOffsets(blockId)
34 }
35 // rememberBlockOffsets 代碼如下:
36 /**
37  * Remember the current offsets for each topic and partition. This is called when a block is
38  * generated.
39  */
40 private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
41   // Get a snapshot of current offset map and store with related block id.
42   val offsetSnapshot = topicPartitionOffsetMap.toMap
43   blockOffsetMap.put(blockId, offsetSnapshot)
44   topicPartitionOffsetMap.clear()
45 }
46 // 可以看出,主要是清除 topic-partition-> offset 映射關系
47 // 建立 block 和topic-partition-> offset的映射關系

其中,blocksForPushing是一個有界阻塞隊列,另外一個線程會一直輪詢它。

 1 private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
 2 private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
 3 
 4 /** Keep pushing blocks to the BlockManager. */
 5 // 這個方法主要的作用就是一直不停地輪詢blocksForPushing隊列,並處理相應的push block 事件。
 6 private def keepPushingBlocks() {
 7   logInfo("Started block pushing thread")
 8 
 9   def areBlocksBeingGenerated: Boolean = synchronized {
10     state != StoppedGeneratingBlocks
11   }
12 
13   try {
14     // While blocks are being generated, keep polling for to-be-pushed blocks and push them.
15     while (areBlocksBeingGenerated) { // 線程沒有被停止,則一直循環
16 // 超時poll操作獲取並刪除頭節點,超過時間(10ms)則返回
17       Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
18         case Some(block) => pushBlock(block) // 如果有數據則進行處理。
19         case None =>
20       }
21     }
22 
23     // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
24     logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
25     while (!blocksForPushing.isEmpty) { // 如果隊列中還有數據,繼續進行處理
26       val block = blocksForPushing.take() // 這是一個堵塞方法,不過現在會馬上返回,因為隊列里面有數據。
27       logDebug(s"Pushing block $block")
28       pushBlock(block) // 處理數據
29       logInfo("Blocks left to push " + blocksForPushing.size())
30     }
31     logInfo("Stopped block pushing thread")
32   } catch {
33     case ie: InterruptedException =>
34       logInfo("Block pushing thread was interrupted")
35     case e: Exception =>
36       reportError("Error in block pushing thread", e)
37   }
38 }

 

其中的pushBlock源碼如下:

1 private def pushBlock(block: Block) {
2   listener.onPushBlock(block.id, block.buffer)
3   logInfo("Pushed block " + block.id)
4 }

其調用的listener(org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler)的 onPushBlock 源碼如下:

1 def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
2   // Store block and commit the blocks offset
3   storeBlockAndCommitOffset(blockId, arrayBuffer)
4 }

其中,storeBlockAndCommitOffset具體代碼如下:

 1 /**
 2  * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method
 3  * will try a fixed number of times to push the block. If the push fails, the receiver is stopped.
 4  */
 5 private def storeBlockAndCommitOffset(
 6     blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
 7   var count = 0
 8   var pushed = false
 9   var exception: Exception = null
10   while (!pushed && count <= 3) { // 整個過程,總共允許3 次重試
11     try {
12       store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
13       pushed = true
14     } catch {
15       case ex: Exception =>
16         count += 1
17         exception = ex
18     }
19   }
20   if (pushed) { // 已經push block
21 // 更新 offset
22     Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
23 // 如果已經push 到 BlockManager 中,則不會再保留 block和topic-partition-> offset的映射關系
24     blockOffsetMap.remove(blockId)
25   } else {
26     stop("Error while storing block into Spark", exception)
27   }
28 }
29 // 其中,commitOffset源碼如下:
30 /**
31  * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
32  * metadata schema in Zookeeper.
33  */
34 private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
35   if (zkClient == null) {
36     val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
37     stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
38     return
39   }
40 
41   for ((topicAndPart, offset) <- offsetMap) {
42     try {
43 // 獲取在 zk 中 comsumer 的partition的目錄
44       val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
45       val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
46       // 更新 consumer 的已消費topic-partition 的offset 操作
47       ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
48     } catch {
49       case e: Exception =>
50         logWarning(s"Exception during commit offset $offset for topic" +
51           s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
52     }
53 
54     logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
55       s"partition ${topicAndPart.partition}")
56   }
57 }

關鍵方法store 如下:

1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
2 def store(dataBuffer: ArrayBuffer[T]) {
3   supervisor.pushArrayBuffer(dataBuffer, None, None)
4 }

其調用了supervisor(org.apache.spark.streaming.receiver.ReceiverSupervisorImpl實例)的pushArrayBuffer方法,內部操作如下:

1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
2 def pushArrayBuffer(
3     arrayBuffer: ArrayBuffer[_],
4     metadataOption: Option[Any],
5     blockIdOption: Option[StreamBlockId]
6   ) {
7   pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
8 }

org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 源碼如下:

 1 /** Store block and report it to driver */
 2 def pushAndReportBlock(
 3     receivedBlock: ReceivedBlock,
 4     metadataOption: Option[Any],
 5     blockIdOption: Option[StreamBlockId]
 6   ) {
 7 // 1.准備blockId,time等信息
 8   val blockId = blockIdOption.getOrElse(nextBlockId)
 9   val time = System.currentTimeMillis
10 // 2. 執行存儲 block 操作
11   val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
12   logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
13 // 3. 獲取保存的message 的記錄數
14   val numRecords = blockStoreResult.numRecords
15 // 4. 通知trackerEndpoint已經添加block,執行更新driver 的WAL操作
16   val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
17   trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
18   logDebug(s"Reported block $blockId")
19 }

其中,receivedBlockHandler 的賦值語句如下:

 1 private val receivedBlockHandler: ReceivedBlockHandler = {
 2   if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
 3     if (checkpointDirOption.isEmpty) {
 4       throw new SparkException(
 5         "Cannot enable receiver write-ahead log without checkpoint directory set. " +
 6           "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
 7           "See documentation for more details.")
 8     }
 9 // enable WAL並且checkpoint dir 不為空,即,在這里,返回WriteAheadLogBasedBlockHandler 對象,這個對象持有了 blockmanager,streamid,storagelevel,conf,checkpointdir 等信息
10     new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
11       receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
12   } else {
13     new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
14   }
15 }

ReceivedBlockHandler 的 storeBlock方法源碼如下:

 1 /**
 2  * This implementation stores the block into the block manager as well as a write ahead log.
 3  * It does this in parallel, using Scala Futures, and returns only after the block has
 4  * been stored in both places.
 5  */
 6 // 並行地將block 存入 blockmanager 和 write ahead log,使用scala 的Future 機制實現的,當兩個都寫完畢之后,返回。
 7 def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
 8 
 9   var numRecords = None: Option[Long]
10   // Serialize the block so that it can be inserted into both
11 // 1. 將ReceivedBlock序列化(未使用壓縮機制)成字節數組
12   val serializedBlock = block match { // serializedBlock 就是序列化后的結果
13     case ArrayBufferBlock(arrayBuffer) => // go this branch
14       numRecords = Some(arrayBuffer.size.toLong)
15       blockManager.dataSerialize(blockId, arrayBuffer.iterator)
16     case IteratorBlock(iterator) =>
17       val countIterator = new CountingIterator(iterator)
18       val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
19       numRecords = countIterator.count
20       serializedBlock
21     case ByteBufferBlock(byteBuffer) =>
22       byteBuffer
23     case _ =>
24       throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
25   }
26 
27   // 2. Store the block in block manager
28   val storeInBlockManagerFuture = Future {
29     val putResult =
30       blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
31     if (!putResult.map { _._1 }.contains(blockId)) {
32       throw new SparkException(
33         s"Could not store $blockId to block manager with storage level $storageLevel")
34     }
35   }
36 
37   // 3. Store the block in write ahead log
38   val storeInWriteAheadLogFuture = Future {
39     writeAheadLog.write(serializedBlock, clock.getTimeMillis())
40   }
41 
42   // 4. Combine the futures, wait for both to complete, and return the write ahead log record handle
43   val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
44 // 等待future任務結果返回。默認時間是 30s, 使用spark.streaming.receiver.blockStoreTimeout 參數來變更默認值
45   val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
46   // 返回cache之后的block 相關信息
47 WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)
48 }

將WAL的block信息發送給driver

注意WriteAheadLogBasedStoreResult 這個 WriteAheadLogBasedStoreResult 實例,后面 RDD 在處理的時候會使用到。
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 通知driver addBlock 的源碼如下:

1 // 4. 通知trackerEndpoint已經添加block,執行更新driver 的WAL操作
2   val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
3   trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
4   logDebug(s"Reported block $blockId")

Driver將WAL block數據寫入到 driver 的WAL中

跳過中間的RPC操作,直接到 driver 端org.apache.spark.streaming.scheduler.ReceiverTracker.ReceiverTrackerEndpoint#receiveAndReply 中:

 1 case AddBlock(receivedBlockInfo) =>
 2   if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
 3     walBatchingThreadPool.execute(new Runnable {
 4       override def run(): Unit = Utils.tryLogNonFatalError {
 5         if (active) {
 6           context.reply(addBlock(receivedBlockInfo))
 7         } else {
 8           throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
 9         }
10       }
11     })
12   } else {
13     context.reply(addBlock(receivedBlockInfo))
14   }

其中 addBlock方法源碼如下:

1 /** Add new blocks for the given stream */
2 private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
3   receivedBlockTracker.addBlock(receivedBlockInfo)
4 }

 

其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#addBlock 源碼如下:

 1 /** Add received block. This event will get written to the write ahead log (if enabled). */
 2 def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
 3   try {
 4     val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
 5     if (writeResult) {
 6       synchronized {
 7         getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
 8       }
 9       logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
10         s"block ${receivedBlockInfo.blockStoreResult.blockId}")
11     } else {
12       logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
13         s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
14     }
15     writeResult
16   } catch {
17     case NonFatal(e) =>
18       logError(s"Error adding block $receivedBlockInfo", e)
19       false
20   }
21 }
22 /** Write an update to the tracker to the write ahead log */
23 private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
24   if (isWriteAheadLogEnabled) {
25     logTrace(s"Writing record: $record")
26     try {
27       writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
28         clock.getTimeMillis())
29       true
30     } catch {
31       case NonFatal(e) =>
32         logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
33         false
34     }
35   } else {
36     true
37   }
38 }
39 /** Get the queue of received blocks belonging to a particular stream */
40 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
41   streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
42 }

上述代碼,主要是將BlockAdditionEvent寫WAL和更新隊列(其實就是mutable.HashMap[Int, ReceivedBlockQueue]),這個隊列中存放的是streamId ->UnallocatedBlock 的映射關系

從WAL RDD 中讀取數據

createStream 源碼如下:

 1 /**
 2  * Create an input stream that pulls messages from Kafka Brokers.
 3  * @param ssc         StreamingContext object
 4  * @param kafkaParams Map of kafka configuration parameters,
 5  *                    see http://kafka.apache.org/08/configuration.html
 6  * @param topics      Map of (topic_name -> numPartitions) to consume. Each partition is consumed
 7  *                    in its own thread.
 8  * @param storageLevel Storage level to use for storing the received objects
 9  * @tparam K type of Kafka message key
10  * @tparam V type of Kafka message value
11  * @tparam U type of Kafka message key decoder
12  * @tparam T type of Kafka message value decoder
13  * @return DStream of (Kafka message key, Kafka message value)
14  */
15 def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
16     ssc: StreamingContext,
17     kafkaParams: Map[String, String],
18     topics: Map[String, Int],
19     storageLevel: StorageLevel
20   ): ReceiverInputDStream[(K, V)] = {
21 // 可以通過設置spark.streaming.receiver.writeAheadLog.enable參數為 true來開啟WAL
22   val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
23   new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
24 }

創建的是KafkaInputDStream對象:

 1 /**
 2  * Input stream that pulls messages from a Kafka Broker.
 3  *
 4  * @param kafkaParams Map of kafka configuration parameters.
 5  *                    See: http://kafka.apache.org/configuration.html
 6  * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
 7  * in its own thread.
 8  * @param storageLevel RDD storage level.
 9  */
10 private[streaming]
11 class KafkaInputDStream[
12   K: ClassTag,
13   V: ClassTag,
14   U <: Decoder[_]: ClassTag,
15   T <: Decoder[_]: ClassTag](
16     ssc_ : StreamingContext,
17     kafkaParams: Map[String, String],
18     topics: Map[String, Int],
19     useReliableReceiver: Boolean,
20     storageLevel: StorageLevel
21   ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
22 
23   def getReceiver(): Receiver[(K, V)] = {
24     if (!useReliableReceiver) { // 未啟用 WAL,會使用 KafkaReceiver 對象
25       new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
26     } else { // 如果啟用了WAL, 使用ReliableKafkaReceiver
27       new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
28     }
29   }
30 }

 

org.apache.spark.streaming.kafka.KafkaInputDStream 繼承父類的 compute方法:

 1 /**
 2  * Generates RDDs with blocks received by the receiver of this stream. */
 3 override def compute(validTime: Time): Option[RDD[T]] = {
 4   val blockRDD = {
 5 
 6     if (validTime < graph.startTime) {
 7       // If this is called for any time before the start time of the context,
 8       // then this returns an empty RDD. This may happen when recovering from a
 9       // driver failure without any write ahead log to recover pre-failure data.
10       new BlockRDD[T](ssc.sc, Array.empty)
11     } else {
12       // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
13       // for this batch
14       val receiverTracker = ssc.scheduler.receiverTracker
15       val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
16 
17       // Register the input blocks information into InputInfoTracker
18       val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
19       ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
20 
21       // Create the BlockRDD
22       createBlockRDD(validTime, blockInfos)
23     }
24   }
25   Some(blockRDD)
26 }

getBlocksOfBatch 如下:

1 /** Get the blocks for the given batch and all input streams. */
2 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = {
3   receivedBlockTracker.getBlocksOfBatch(batchTime)
4 }
5 調用:
6 /** Get the blocks allocated to the given batch. */
7 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized {
8   timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty)
9 }

JobGenerator將WAL block 分配給一個batch,並生成job

取出WAL block 信息

在 org.apache.spark.streaming.scheduler.JobGenerator 中聲明了一個定時器:

1 // timer 會按照批次間隔 生成 GenerateJobs 任務,並放入eventLoop 堵塞隊列中
2 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
3   longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

EventLoop 實例化代碼如下:

1 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
2   override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
3 
4   override protected def onError(e: Throwable): Unit = {
5     jobScheduler.reportError("Error in job generator", e)
6   }
7 }
8 eventLoop.start()

EventLoop里定義了一個LinkedBlockingDeque雙端堵塞隊列和一個執行daemon線程,daemon線程會不停從 雙端堵塞隊列中堵塞式取數據,一旦取到數據,會調 onReceive 方法,即 processEvent 方法:

 1 /** Processes all events */
 2 private def processEvent(event: JobGeneratorEvent) {
 3   logDebug("Got event " + event)
 4   event match {
 5     case GenerateJobs(time) => generateJobs(time)
 6     case ClearMetadata(time) => clearMetadata(time)
 7     case DoCheckpoint(time, clearCheckpointDataLater) =>
 8       doCheckpoint(time, clearCheckpointDataLater)
 9     case ClearCheckpointData(time) => clearCheckpointData(time)
10   }
11 }

由於是GenerateJobs 事件, 會繼續調用generateJobs 方法:

 1 /** Generate jobs and perform checkpoint for the given `time`.  */
 2 private def generateJobs(time: Time) {
 3   // Set the SparkEnv in this thread, so that job generation code can access the environment
 4   // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
 5   // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
 6   SparkEnv.set(ssc.env)
 7   Try {
 8 // 1. 將 WAL block 信息 分配給batch(這些數據塊信息是worker 節點cache 到WAL 之后發送給driver 端的)
 9     jobScheduler.receiverTracker.allocateBlocksToBatch(time)
10 // 2. 使用分配的block數據塊來生成任務
11     graph.generateJobs(time) // generate jobs using allocated block
12   } match {
13     case Success(jobs) =>
14       val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
15       jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
16     case Failure(e) =>
17       jobScheduler.reportError("Error generating jobs for time " + time, e)
18   }
19 // 發布DoCheckpoint 事件,保存checkpoint操作,主要是將新的checkpoint 數據寫入到 hdfs, 刪除舊的 checkpoint 數據
20   eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
21 }

第一步中調用的
org.apache.spark.streaming.scheduler.ReceiverTracker#allocateBlocksToBatch方法如下:

1 /** Allocate all unallocated blocks to the given batch. */
2 def allocateBlocksToBatch(batchTime: Time): Unit = {
3   if (receiverInputStreams.nonEmpty) {
4     receivedBlockTracker.allocateBlocksToBatch(batchTime)
5   }
6 }

其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#allocateBlocksToBatch 方法如下:

 1 def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
 2   if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
 3 // 遍歷輸入流,根據流的 streamId 獲取未被分配的block隊列,並返回[streamId, seq[receivedBlockInfo]],由此可知,到此為止,數據其實已經從receiver中讀出來了。
 4    // 獲取 streamid和 WAL的blocks 的映射關系
 5 val streamIdToBlocks = streamIds.map { streamId =>
 6         (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
 7     }.toMap
 8     val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
 9     if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
10       timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
11       lastAllocatedBatchTime = batchTime
12     } else {
13       logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
14     }
15   } else {
16     // This situation occurs when:
17     // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
18     // possibly processed batch job or half-processed batch job need to be processed again,
19     // so the batchTime will be equal to lastAllocatedBatchTime.
20     // 2. Slow checkpointing makes recovered batch time older than WAL recovered
21     // lastAllocatedBatchTime.
22     // This situation will only occurs in recovery time.
23     logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
24   }
25 }

其中,getReceivedBlockQueue的源碼如下:

1 /** Get the queue of received blocks belonging to a particular stream */
2 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
3   streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
4 }

可以看到,worker node 發送過來的block 數據被取出來了。

 

根據WAL block創建 RDD

org.apache.spark.streaming.dstream.ReceiverInputDStream#createBlockRDD 源碼如下:

 1 private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
 2 
 3   if (blockInfos.nonEmpty) {
 4     val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
 5    // 所有的block已經有了WriteAheadLogRecordHandle, 創建一個WALBackedBlockRDD即可, 否則創建BlockRDD。
 6 // 其中,WriteAheadLogRecordHandle 是一個跟WAL 相關聯的EntryInfo,實現類FileBasedWriteAheadLogSegment就包含了WAL segment 的path, offset 以及 length 信息。RDD 在真正需要數據時,根據這些handle信息從 WAL 中讀取數據。
 7     // Are WAL record handles present with all the blocks
 8     val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
 9 
10     if (areWALRecordHandlesPresent) {
11       // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
12       val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
13       val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
14       new WriteAheadLogBackedBlockRDD[T](
15         ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
16     } else {
17       // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
18       // others then that is unexpected and log a warning accordingly.
19       if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
20         if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
21           logError("Some blocks do not have Write Ahead Log information; " +
22             "this is unexpected and data may not be recoverable after driver failures")
23         } else {
24           logWarning("Some blocks have Write Ahead Log information; this is unexpected")
25         }
26       }
27       val validBlockIds = blockIds.filter { id =>
28         ssc.sparkContext.env.blockManager.master.contains(id)
29       }
30       if (validBlockIds.size != blockIds.size) {
31         logWarning("Some blocks could not be recovered as they were not found in memory. " +
32           "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
33           "for more details.")
34       }
35       new BlockRDD[T](ssc.sc, validBlockIds)
36     }
37   } else {
38     // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
39     // according to the configuration
40     if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
41       new WriteAheadLogBackedBlockRDD[T](
42         ssc.sparkContext, Array.empty, Array.empty, Array.empty)
43     } else {
44       new BlockRDD[T](ssc.sc, Array.empty)
45     }
46   }
47 }

org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD#compute 的源碼如下:

 1 /**
 2  * Gets the partition data by getting the corresponding block from the block manager.
 3  * If the block does not exist, then the data is read from the corresponding record
 4  * in write ahead log files.
 5  */
 6 override def compute(split: Partition, context: TaskContext): Iterator[T] = {
 7   assertValid()
 8   val hadoopConf = broadcastedHadoopConf.value
 9   val blockManager = SparkEnv.get.blockManager
10   val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
11   val blockId = partition.blockId
12 
13   def getBlockFromBlockManager(): Option[Iterator[T]] = {
14     blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
15   }
16 
17   def getBlockFromWriteAheadLog(): Iterator[T] = {
18     var dataRead: ByteBuffer = null
19     var writeAheadLog: WriteAheadLog = null
20     try {
21       // The WriteAheadLogUtils.createLog*** method needs a directory to create a
22       // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
23       // writing log data. However, the directory is not needed if data needs to be read, hence
24       // a dummy path is provided to satisfy the method parameter requirements.
25       // FileBasedWriteAheadLog will not create any file or directory at that path.
26       // FileBasedWriteAheadLog will not create any file or directory at that path. Also,
27       // this dummy directory should not already exist otherwise the WAL will try to recover
28       // past events from the directory and throw errors.
29       val nonExistentDirectory = new File(
30         System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
31       writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
32         SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
33       dataRead = writeAheadLog.read(partition.walRecordHandle)
34     } catch {
35       case NonFatal(e) =>
36         throw new SparkException(
37           s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
38     } finally {
39       if (writeAheadLog != null) {
40         writeAheadLog.close()
41         writeAheadLog = null
42       }
43     }
44     if (dataRead == null) {
45       throw new SparkException(
46         s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
47           s"read returned null")
48     }
49     logInfo(s"Read partition data of $this from write ahead log, record handle " +
50       partition.walRecordHandle)
51     if (storeInBlockManager) {
52       blockManager.putBytes(blockId, dataRead, storageLevel)
53       logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
54       dataRead.rewind()
55     }
56     blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
57   }
58  // 如果partition.isBlockIdValid 為true,則說明該 block 數據存在executors 中
59   if (partition.isBlockIdValid) {
60 // 先根據 BlockManager從 executor中讀取數據, 如果沒有,再從WAL 中讀取數據
61 // BlockManager 從內存還是從磁盤上獲取的數據 ?
62 blockManager 從 local 或 remote 獲取 block,其中 local既可以從 memory 中獲取也可以從 磁盤中讀取, 其中remote獲取數據是同步的,即在fetch block 過程中會一直blocking。
63     getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() }
64   } else {
65     getBlockFromWriteAheadLog()
66   }
67 }

 

至此,從啟動 receiver,到receiver 接收數據並保存到WAL block,driver 接收WAL 的block 信息,直到spark streaming 通過WAL RDD 來獲取數據等等都一一做了說明。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM