使用分布式receiver來獲取數據
使用 WAL 來實現 At least once 操作:
conf.set("spark.streaming.receiver.writeAheadLog.enable","true") // 開啟 WAL
// 1、At most once - 每條數據最多被處理一次(0次或1次),這種語義下會出現數據丟失的問題;
// 2、At least once - 每條數據最少被處理一次 (1次或更多),這個不會出現數據丟失,但是會出現數據重復;
// 3、Exactly once - 每條數據只會被處理一次,沒有數據會丟失,並且沒有數據會被多次處理,這種語義是大家最想要的,但是也是最難實現的。
如果不做容錯,將會帶來數據丟失,因為Receiver一直在接收數據,在其沒有處理的時候(已通知zk數據接收到),Executor突然掛掉(或是driver掛掉通知executor關閉),緩存在內存中的數據就會丟失。因為這個問題,Spark1.2開始加入了WAL(Write ahead log)開啟 WAL,將receiver獲取數據的存儲級別修改為StorageLevel. MEMORY_AND_DISK_SER_2
1 // 缺點,不能自己維護消費 topic partition 的 offset 2 // 優點,開啟 WAL,來確保 exactly-once 語義 3 val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder]( 4 ssc,kafkaParams,map,StorageLevel.MEMORY_AND_DISK_SER_2)
從Kafka 中讀取數據
Driver 規划 receiver 運行的信息
org.apache.spark.streaming.StreamingContext#start中啟動了 JobScheduler實例
1 // private[streaming] val scheduler = new JobScheduler(this) 2 3 // Start the streaming scheduler in a new thread, so that thread local properties 4 // like call sites and job groups can be reset without affecting those of the 5 // current thread. 6 ThreadUtils.runInNewThread("streaming-start") { // 單獨的一個daemon線程運行函數題 7 sparkContext.setCallSite(startSite.get) 8 sparkContext.clearJobGroup() 9 sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") 10 // 執行start 方法 11 scheduler.start() 12 } 13 state = StreamingContextState.ACTIVE
org.apache.spark.streaming.scheduler.JobScheduler#start 源碼如下:
1 def start(): Unit = synchronized { 2 if (eventLoop != null) return // scheduler has already been started 3 4 logDebug("Starting JobScheduler") 5 eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { 6 override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) 7 8 override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) 9 } 10 eventLoop.start() 11 12 // attach rate controllers of input streams to receive batch completion updates 13 for { 14 inputDStream <- ssc.graph.getInputStreams 15 rateController <- inputDStream.rateController 16 } ssc.addStreamingListener(rateController) 17 18 listenerBus.start(ssc.sparkContext) 19 receiverTracker = new ReceiverTracker(ssc) 20 inputInfoTracker = new InputInfoTracker(ssc) 21 receiverTracker.start() 22 jobGenerator.start() 23 logInfo("Started JobScheduler") 24 }
ReceiverTracker 的類聲明如下:
1 This class manages the execution of the receivers of ReceiverInputDStreams. Instance of this class must be created after all input streams have been added and StreamingContext.start() has been called because it needs the final set of input streams at the time of instantiation. 2 此類負責執行ReceiverInputDStreams的receiver。必須在添加所有輸入流並調用StreamingContext.start()之后創建此類的實例,因為它在實例化時需要最終的輸入流集。
其 start 方法如下:
1 /** Start the endpoint and receiver execution thread. */ 2 def start(): Unit = synchronized { 3 if (isTrackerStarted) { 4 throw new SparkException("ReceiverTracker already started") 5 } 6 7 if (!receiverInputStreams.isEmpty) { 8 // 建立rpc endpoint 9 endpoint = ssc.env.rpcEnv.setupEndpoint( // 注意,這是一個driver的 endpoint 10 "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) 11 // driver節點上發送啟動 receiver 命令 12 if (!skipReceiverLaunch) launchReceivers() 13 logInfo("ReceiverTracker started") 14 trackerState = Started 15 } 16 } 17 18 /** 19 * Get the receivers from the ReceiverInputDStreams, distributes them to the 20 * worker nodes as a parallel collection, and runs them. 21 */ 22 // 從ReceiverInputDStreams 獲取到 receivers,然后將它們分配到不同的 worker 節點並運行它們。 23 private def launchReceivers(): Unit = { 24 val receivers = receiverInputStreams.map(nis => { 25 // 未啟用WAL 是KafkaReceiver,啟動WAL后是ReliableKafkaReceiver 26 val rcvr = nis.getReceiver() 27 rcvr.setReceiverId(nis.id) 28 rcvr 29 }) 30 // 運行一個簡單的應用來確保所有的salve node都已經啟動起來,避免所有的 receiver 任務都在同一個local node上 31 runDummySparkJob() 32 33 logInfo("Starting " + receivers.length + " receivers") 34 endpoint.send(StartAllReceivers(receivers)) // 發送請求driver 轉發 啟動 receiver 的命令 35 }
Driver 端StartAllReceivers 的處理代碼如下:
1 override def receive: PartialFunction[Any, Unit] = { 2 // Local messages 3 case StartAllReceivers(receivers) => 4 // schduleReceiver 5 val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) 6 for (receiver <- receivers) { 7 val executors = scheduledLocations(receiver.streamId) 8 updateReceiverScheduledExecutors(receiver.streamId, executors) 9 receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation 10 startReceiver(receiver, executors) 11 } 12 …… 13 }
getExecutors源碼如下:
1 /** 2 * Get the list of executors excluding driver 3 */ 4 // 如果是 local 模式,返回 本地線程; 如果是 yarn 模式,返回 非driver 節點上的 excutors 5 private def getExecutors: Seq[ExecutorCacheTaskLocation] = { 6 if (ssc.sc.isLocal) { // 如果在 local 模式下運行 7 val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId 8 Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)) 9 } else { // 在 yarn 模式下,過濾掉 driver 的 executor 10 ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) => 11 blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location 12 }.map { case (blockManagerId, _) => 13 ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId) 14 }.toSeq 15 } 16 }
org.apache.spark.streaming.scheduler.ReceiverSchedulingPolicy#scheduleReceivers的解釋如下:
1 Try our best to schedule receivers with evenly distributed. However, if the preferredLocations of receivers are not even, we may not be able to schedule them evenly because we have to respect them. Here is the approach to schedule executors: 2 First, schedule all the receivers with preferred locations (hosts), evenly among the executors running on those host. 3 Then, schedule all other receivers evenly among all the executors such that overall distribution over all the receivers is even. 4 This method is called when we start to launch receivers at the first time. 5 該方法就是確保receiver 能夠在worker node 上均勻分布的。遵循以下兩個原則: 6 1.使用 preferred location 分配 receiver 到這些node 上 7 2.將其他的未分配的receiver均勻分布均勻分布到 每一個 worker node 上
org.apache.spark.streaming.scheduler.ReceiverTracker#updateReceiverScheduledExecutors 負責更新receiverid 和 receiver info 的映射關系,源碼如下:
1 private def updateReceiverScheduledExecutors( 2 receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = { 3 val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match { 4 case Some(oldInfo) => 5 oldInfo.copy(state = ReceiverState.SCHEDULED, 6 scheduledLocations = Some(scheduledLocations)) 7 case None => 8 ReceiverTrackingInfo( 9 receiverId, 10 ReceiverState.SCHEDULED, 11 Some(scheduledLocations), 12 runningExecutor = None) 13 } 14 receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo) 15 }
Driver 發送分布式啟動receiver job
startReceiver 負責啟動 receiver,源碼如下:
1 /** 2 * Start a receiver along with its scheduled executors 3 */ 4 private def startReceiver( 5 receiver: Receiver[_], 6 scheduledLocations: Seq[TaskLocation]): Unit = { 7 def shouldStartReceiver: Boolean = { 8 // It's okay to start when trackerState is Initialized or Started 9 !(isTrackerStopping || isTrackerStopped) 10 } 11 12 val receiverId = receiver.streamId 13 if (!shouldStartReceiver) { 14 onReceiverJobFinish(receiverId) 15 return 16 } 17 18 val checkpointDirOption = Option(ssc.checkpointDir) 19 val serializableHadoopConf = 20 new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) 21 22 // 在 worker node 上啟動 receiver 的方法 23 val startReceiverFunc: Iterator[Receiver[_]] => Unit = 24 (iterator: Iterator[Receiver[_]]) => { 25 if (!iterator.hasNext) { 26 throw new SparkException( 27 "Could not start receiver as object not found.") 28 } 29 if (TaskContext.get().attemptNumber() == 0) { 30 val receiver = iterator.next() 31 assert(iterator.hasNext == false) 32 val supervisor = new ReceiverSupervisorImpl( 33 receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) 34 supervisor.start() 35 supervisor.awaitTermination() 36 } else { 37 // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. 38 } 39 } 40 41 // Create the RDD using the scheduledLocations to run the receiver in a Spark job 42 val receiverRDD: RDD[Receiver[_]] = 43 if (scheduledLocations.isEmpty) { 44 ssc.sc.makeRDD(Seq(receiver), 1) 45 } else { 46 val preferredLocations = scheduledLocations.map(_.toString).distinct 47 ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) 48 } 49 receiverRDD.setName(s"Receiver $receiverId") 50 ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") 51 ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) 52 // 提交分布式receiver 啟動任務 53 val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( 54 receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) 55 // We will keep restarting the receiver job until ReceiverTracker is stopped 56 future.onComplete { 57 case Success(_) => 58 if (!shouldStartReceiver) { 59 onReceiverJobFinish(receiverId) 60 } else { 61 logInfo(s"Restarting Receiver $receiverId") 62 self.send(RestartReceiver(receiver)) 63 } 64 case Failure(e) => 65 if (!shouldStartReceiver) { 66 onReceiverJobFinish(receiverId) 67 } else { 68 logError("Receiver has been stopped. Try to restart it.", e) 69 logInfo(s"Restarting Receiver $receiverId") 70 self.send(RestartReceiver(receiver)) 71 } 72 }(submitJobThreadPool) 73 logInfo(s"Receiver ${receiver.streamId} started") 74 }
Worker節點啟動 receiver監管服務
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#ReceiverSupervisorImpl 的 start 方法如下:
1 /** Start the supervisor */ 2 def start() { 3 onStart() 4 startReceiver() 5 } 6 override protected def onStart() { // 啟動 BlockGenerator 服務 7 registeredBlockGenerators.foreach { _.start() } 8 } 9 // startReceiver 方法如下: 10 /** Start receiver */ 11 def startReceiver(): Unit = synchronized { 12 try { 13 if (onReceiverStart()) { // 注冊receiver 成功 14 logInfo("Starting receiver") 15 receiverState = Started 16 receiver.onStart() // 啟動 receiver 17 logInfo("Called receiver onStart") 18 } else { 19 // The driver refused us 20 stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None) 21 } 22 } catch { 23 case NonFatal(t) => 24 stop("Error starting receiver " + streamId, Some(t)) 25 } 26 }
注冊 receiver 到 driver節點
1 override protected def onReceiverStart(): Boolean = { 2 val msg = RegisterReceiver( 3 streamId, receiver.getClass.getSimpleName, host, executorId, endpoint) 4 trackerEndpoint.askWithRetry[Boolean](msg) 5 }
簡單描述一下driver 端做的事情,主要負責將其納入到org.apache.spark.streaming.scheduler.ReceiverTracker 的管理中來,具體streamid 和 ReceiverTrackingInfo 的映射關系保存在receiverTrackingInfos中。
org.apache.spark.streaming.scheduler.ReceiverTracker#registerReceiver關鍵代碼如下:
1 val name = s"${typ}-${streamId}" 2 val receiverTrackingInfo = ReceiverTrackingInfo( 3 streamId, 4 ReceiverState.ACTIVE, 5 scheduledLocations = None, 6 runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)), 7 name = Some(name), 8 endpoint = Some(receiverEndpoint)) 9 receiverTrackingInfos.put(streamId, receiverTrackingInfo) 10 listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
啟動 receiver 線程
由於我們啟用了 WAL, 所以 這里的receiver 是ReliableKafkaReceiver 的實例
receiver.onStart 即 org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart, 源碼如下:
1 override def onStart(): Unit = { 2 logInfo(s"Starting Kafka Consumer Stream with group: $groupId") 3 4 // Initialize the topic-partition / offset hash map. 5 // 1. 負責維護消費的 topic-partition 和 offset 的映射關系 6 topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long] 7 8 // Initialize the stream block id / offset snapshot hash map. 9 // 2. 負責維護 block-id 和 partition-offset 之間的映射關系 10 blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]() 11 12 // Initialize the block generator for storing Kafka message. 13 // 3. 負責保存 kafka message 的 block generator,入參是GeneratedBlockHandler 實例,這是一個負責監聽 block generator事件的一個監聽器 14 // Generates batches of objects received by a org.apache.spark.streaming.receiver.Receiver and puts them into appropriately named blocks at regular intervals. This class starts two threads, one to periodically start a new batch and prepare the previous batch of as a block, the other to push the blocks into the block manager. 15 blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler) 16 // 4. 關閉consumer 自動提交 offset 選項 17 // auto_offset_commit 應該是 false 18 if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") { 19 logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " + 20 "otherwise we will manually set it to false to turn off auto offset commit in Kafka") 21 } 22 23 val props = new Properties() 24 kafkaParams.foreach(param => props.put(param._1, param._2)) 25 // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true, 26 // we have to make sure this property is set to false to turn off auto commit mechanism in Kafka. 27 props.setProperty(AUTO_OFFSET_COMMIT, "false") 28 29 val consumerConfig = new ConsumerConfig(props) 30 31 assert(!consumerConfig.autoCommitEnable) 32 33 logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}") 34 // 5. 初始化 consumer 對象 35 // consumerConnector 是ZookeeperConsumerConnector的實例 36 consumerConnector = Consumer.create(consumerConfig) 37 logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}") 38 // 6. 初始化zookeeper 的客戶端 39 zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, 40 consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) 41 // 7. 創建線程池來處理消息流,池的大小是固定的,為partition 的總數,並指定線程池中每一個線程的name 的前綴,內部使用ThreadPoolExecutor,並且 創建線程的 factory類是guava 工具包提供的。 42 messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool( 43 topics.values.sum, "KafkaMessageHandler") 44 // 8. 啟動 BlockGenerator內的兩個線程 45 blockGenerator.start() 46 47 // 9. 創建MessageStream對象 48 val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 49 .newInstance(consumerConfig.props) 50 .asInstanceOf[Decoder[K]] 51 52 val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 53 .newInstance(consumerConfig.props) 54 .asInstanceOf[Decoder[V]] 55 56 val topicMessageStreams = consumerConnector.createMessageStreams( 57 topics, keyDecoder, valueDecoder) 58 // 10. 將待處理的MessageHandler 放入 線程池中,等待執行 59 topicMessageStreams.values.foreach { streams => 60 streams.foreach { stream => 61 messageHandlerThreadPool.submit(new MessageHandler(stream)) 62 } 63 } 64 }
其中, 第9 步,創建MessageStream對象,
kafka.consumer.ZookeeperConsumerConnector#createMessageStreams 方法如下:
1 def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) 2 : Map[String, List[KafkaStream[K,V]]] = { 3 if (messageStreamCreated.getAndSet(true)) 4 throw new MessageStreamsExistException(this.getClass.getSimpleName + 5 " can create message streams at most once",null) 6 consume(topicCountMap, keyDecoder, valueDecoder) 7 }
其調用了 consume 方法,源碼如下:
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String,List[KafkaStream[K,V]]] = { debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") // 1. 初始化 topicCount val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap) // 2. 獲取 每一個topic 和 threadId 集合的映射關系 val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic // make a list of (queue,stream) pairs, one pair for each threadId // 3. 得到每一個 threadId 對應 (queue, stream) 的映射列表 val queuesAndStreams = topicThreadIds.values.map(threadIdSet => threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) (queue, stream) }) ).flatten.toList // 4. 獲取 groupId 在 zookeeper 中的path val dirs = new ZKGroupDirs(config.groupId) // 5. 注冊 consumer 到 groupId(在zk中) registerConsumerInZK(dirs, consumerIdString, topicCount) // 6. 重新初始化 consumer reinitializeConsumer(topicCount, queuesAndStreams) // 7. 返回流 loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]] }
consumer消費kafka數據
在 kafka.consumer.ZookeeperConsumerConnector#consume方法中,有如下操作:
1 // 得到每一個 threadId 對應 (queue, stream) 的映射列表 2 val queuesAndStreams = topicThreadIds.values.map(threadIdSet => 3 threadIdSet.map(_ => { 4 val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) 5 val stream = new KafkaStream[K,V]( 6 queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) 7 (queue, stream) 8 }) 9 ).flatten.toList 10 // 獲取 groupId 在 zookeeper 中的path 11 val dirs = new ZKGroupDirs(config.groupId) 12 // 注冊 consumer 到 groupId(在zk中) 13 registerConsumerInZK(dirs, consumerIdString, topicCount) 14 // 重新初始化 consumer 15 reinitializeConsumer(topicCount, queuesAndStreams)
在上面的代碼中,可以看到初始化的queue(LinkedBlockingQueue實例)除了被傳入stream(KafkaStream)的構造函數被迭代器從中取數據,還和 stream 重組成Tuple2[LinkedBlockingQueue[FetchedDataChunk]的list,之后被傳入reinitializeConsumer 方法中。
kafka.consumer.ZookeeperConsumerConnector#reinitializeConsume 其源碼如下:
1 private def reinitializeConsumer[K,V]( 2 topicCount: TopicCount, 3 queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) { 4 // 1. 獲取 該groupid 在 zk 中的路徑 5 val dirs = new ZKGroupDirs(config.groupId) 6 7 // listener to consumer and partition changes 8 // 2. 初始化loadBalancerListener,這個負載均衡listener 會時刻監控 consumer 和 partition 的變化 9 if (loadBalancerListener == null) { 10 val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]] 11 loadBalancerListener = new ZKRebalancerListener( 12 config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) 13 } 14 15 // create listener for session expired event if not exist yet 16 // 3. 監控 session 過期的listner, 有新session注冊初始化,會通知 loadBalancer 17 if (sessionExpirationListener == null) 18 sessionExpirationListener = new ZKSessionExpireListener( 19 dirs, consumerIdString, topicCount, loadBalancerListener) 20 21 // create listener for topic partition change event if not exist yet 22 // 4. 初始化ZKTopicPartitionChangeListener實例,當topic partition 變化時,這個listener會通知 loadBalancer 23 if (topicPartitionChangeListener == null) 24 topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener) 25 // 5. 將queuesAndStreams 的值經過一系列轉換,並添加到loadBalancerListener.kafkaMessageAndMetadataStreams 中 26 val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams 27 28 // map of {topic -> Set(thread-1, thread-2, ...)} 29 val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] = 30 topicCount.getConsumerThreadIdsPerTopic 31 32 val allQueuesAndStreams = topicCount match { 33 case wildTopicCount: WildcardTopicCount => // 這里是WildcardTopicCount,走這個分支 34 /* 35 * Wild-card consumption streams share the same queues, so we need to 36 * duplicate the list for the subsequent zip operation. 37 */ 38 (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList 39 case statTopicCount: StaticTopicCount => 40 queuesAndStreams 41 } 42 43 val topicThreadIds = consumerThreadIdsPerTopic.map { 44 case(topic, threadIds) => 45 threadIds.map((topic, _)) 46 }.flatten 47 48 require(topicThreadIds.size == allQueuesAndStreams.size, 49 "Mismatch between thread ID count (%d) and queue count (%d)" 50 .format(topicThreadIds.size, allQueuesAndStreams.size)) 51 val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams) 52 53 threadQueueStreamPairs.foreach(e => { 54 val topicThreadId = e._1 55 val q = e._2._1 56 topicThreadIdAndQueues.put(topicThreadId, q) 57 debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString)) 58 newGauge( 59 "FetchQueueSize", 60 new Gauge[Int] { 61 def value = q.size 62 }, 63 Map("clientId" -> config.clientId, 64 "topic" -> topicThreadId._1, 65 "threadId" -> topicThreadId._2.threadId.toString) 66 ) 67 }) 68 69 val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1) 70 groupedByTopic.foreach(e => { 71 val topic = e._1 72 val streams = e._2.map(_._2._2).toList 73 topicStreamsMap += (topic -> streams) 74 debug("adding topic %s and %d streams to map.".format(topic, streams.size)) 75 }) 76 77 // listener to consumer and partition changes 78 // 6. 使用 zkClient 注冊sessionExpirationListener 實例 79 zkClient.subscribeStateChanges(sessionExpirationListener) 80 // 7. 使用 zkClient 注冊loadBalancerListener 實例 81 zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) 82 // 遍歷每一個topic,使用zkClient 注冊topicPartitionChangeListener 實例 83 topicStreamsMap.foreach { topicAndStreams => 84 // register on broker partition path changes 85 val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 86 zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener) 87 } 88 89 // explicitly trigger load balancing for this consumer 90 // 8. 使用 loadBalancerListener 同步做負載均衡 91 loadBalancerListener.syncedRebalance() 92 }
重點看 第 8 步,使用 loadBalancerListener 同步做負載均衡。
kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#syncedRebalance 源碼如下:
1 def syncedRebalance() { 2 rebalanceLock synchronized { 3 rebalanceTimer.time { 4 if(isShuttingDown.get()) { // 如果ZookeeperConsumerConnector 5 已經shutdown了,直接返回 6 return 7 } else { 8 for (i <- 0 until config.rebalanceMaxRetries) { // 默認是 4 次 9 info("begin rebalancing consumer " + consumerIdString + " try #" + i) 10 var done = false 11 var cluster: Cluster = null 12 try { 13 // 1. 根據zkClient 實例 獲取並創建Cluster 對象,這個 cluster 實例包含了一個 Broker(broker的id,broker在zk中的路徑) 列表 14 cluster = getCluster(zkClient) 15 // 2. 在cluster中做 rebalance操作 16 done = rebalance(cluster) 17 } catch { 18 case e: Throwable => 19 /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. 20 * For example, a ZK node can disappear between the time we get all children and the time we try to get 21 * the value of a child. Just let this go since another rebalance will be triggered. 22 **/ 23 info("exception during rebalance ", e) 24 } 25 info("end rebalancing consumer " + consumerIdString + " try #" + i) 26 if (done) { 27 return 28 } else { 29 /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should 30 * clear the cache */ 31 info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") 32 } 33 // stop all fetchers and clear all the queues to avoid data duplication 34 closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) 35 Thread.sleep(config.rebalanceBackoffMs) 36 } 37 } 38 } 39 } 40 41 throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries") 42 }
重點看 第2 步,在 cluster 中做 rebalance 操作,kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#rebalance 源碼如下:
1 private def rebalance(cluster: Cluster): Boolean = { 2 // 1. 獲取 group和 threadId 的Map 映射關系 3 val myTopicThreadIdsMap = TopicCount.constructTopicCount( 4 group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic 5 // 2. 獲取kafka cluster 中所有可用的node 6 val brokers = getAllBrokersInCluster(zkClient) 7 if (brokers.size == 0) { // 如果可用節點為空,設置listener訂閱,返回 true 8 // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started. 9 // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers 10 // are up. 11 warn("no brokers found when trying to rebalance.") 12 zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener) 13 true 14 } 15 else { 16 /** 17 * fetchers must be stopped to avoid data duplication, since if the current 18 * rebalancing attempt fails, the partitions that are released could be owned by another consumer. 19 * But if we don't stop the fetchers first, this consumer would continue returning data for released 20 * partitions in parallel. So, not stopping the fetchers leads to duplicate data. 21 */ 22 // 3. 做rebalance 之前的准備工作 23 // 3.1. 關閉現有 fetcher 連接 24 closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) 25 // 3.2 釋放 partition 的所有權(主要是刪除zk下的owner 節點的數據以及解除內存中的topic和 fetcher的關聯關系) 26 releasePartitionOwnership(topicRegistry) 27 // 3.3. 重新給partition分配 fetcher 28 val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) 29 val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) 30 val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( 31 valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) 32 33 // fetch current offsets for all topic-partitions 34 // 3.4 獲取當前fetcher對應的 partitions 的 offsets,這里的offset是指 consumer 下一個要消費的offset 35 val topicPartitions = partitionOwnershipDecision.keySet.toSeq 36 37 val offsetFetchResponseOpt = fetchOffsets(topicPartitions) 38 39 if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined) 40 false 41 else { 42 // 3.5 更新 partition 和 fetcher 的對應關系 43 val offsetFetchResponse = offsetFetchResponseOpt.get 44 topicPartitions.foreach(topicAndPartition => { 45 val (topic, partition) = topicAndPartition.asTuple 46 // requestInfo是OffsetFetchResponse實例中的成員變量,它是一個Map[TopicAndPartition, OffsetMetadataAndError]實例 47 val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset 48 val threadId = partitionOwnershipDecision(topicAndPartition) 49 addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId) 50 }) 51 52 /** 53 * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt 54 * A rebalancing attempt is completed successfully only after the fetchers have been started correctly 55 */ 56 if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) { 57 allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size 58 59 partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic } 60 .foreach { case (topic, partitionThreadPairs) => 61 newGauge("OwnedPartitionsCount", 62 new Gauge[Int] { 63 def value() = partitionThreadPairs.size 64 }, 65 ownedPartitionsCountMetricTags(topic)) 66 } 67 // 3.6 將已經新的 topic registry 覆蓋舊的 68 topicRegistry = currentTopicRegistry 69 // 4. 更新 fetcher 70 updateFetcher(cluster) 71 true 72 } else { 73 false 74 } 75 } 76 } 77 }
其中addPartitionTopicInfo 源碼如下:
1 private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]], 2 partition: Int, topic: String, 3 offset: Long, consumerThreadId: ConsumerThreadId) { 4 //如果map沒有對應的 key,會使用valueFactory初始化鍵值對,並返回 對應的 value 5 val partTopicInfoMap = currentTopicRegistry.getAndMaybePut(topic) 6 7 val queue = topicThreadIdAndQueues.get((topic, consumerThreadId)) 8 val consumedOffset = new AtomicLong(offset) 9 val fetchedOffset = new AtomicLong(offset) 10 val partTopicInfo = new PartitionTopicInfo(topic, 11 partition, 12 queue, 13 consumedOffset, 14 fetchedOffset, 15 new AtomicInteger(config.fetchMessageMaxBytes), 16 config.clientId) 17 // 1. 將其注冊到新的 Topic注冊中心中,即注冊 partition 和 fetcher 的關系 18 partTopicInfoMap.put(partition, partTopicInfo) 19 debug(partTopicInfo + " selected new offset " + offset) 20 // 2. 更新consumer 的 已經消費的offset信息 21 checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset) 22 } 23 }
第4步, 更新 fetcher 源碼如下:
1 private def updateFetcher(cluster: Cluster) { 2 // update partitions for fetcher 3 var allPartitionInfos : List[PartitionTopicInfo] = Nil 4 for (partitionInfos <- topicRegistry.values) 5 for (partition <- partitionInfos.values) 6 allPartitionInfos ::= partition 7 info("Consumer " + consumerIdString + " selected partitions : " + 8 allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(",")) 9 10 fetcher match { 11 case Some(f) => 12 f.startConnections(allPartitionInfos, cluster) 13 case None => 14 } 15 }
其中,f.startConnections方法真正執行 更新操作。此時引入一個新的類。即 fetcher 類,kafka.consumer.ConsumerFetcherManager。
kafka.consumer.ConsumerFetcherManager#startConnections 的源碼如下:
1 def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) { 2 // LeaderFinderThread 在 topic 的leader node可用時,將 fetcher 添加到 leader 節點上 3 leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread") 4 leaderFinderThread.start() 5 6 inLock(lock) { 7 // 更新ConsumerFetcherManager 成員變量 8 partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap 9 this.cluster = cluster 10 noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId)) 11 cond.signalAll() 12 } 13 }
ConsumerFetcherManager 有一個LeaderFinderThread 實例,該類的父類kafka.utils.ShutdownableThread ,run 方法如下:
1 override def run(): Unit = { 2 info("Starting ") 3 try{ 4 while(isRunning.get()){ 5 doWork() 6 } 7 } catch{ 8 case e: Throwable => 9 if(isRunning.get()) 10 error("Error due to ", e) 11 } 12 shutdownLatch.countDown() 13 info("Stopped ") 14 }
doWork其實就是一個抽象方法,其子類LeaderFinderThread的實現如下:
1 // thread responsible for adding the fetcher to the right broker when leader is available 2 override def doWork() { 3 // 1. 獲取 partition 和leader node的映射關系 4 val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker] 5 lock.lock() 6 try { 7 while (noLeaderPartitionSet.isEmpty) { // 這個字段在startConnections 已更新新值 8 trace("No partition for leader election.") 9 cond.await() 10 } 11 12 trace("Partitions without leader %s".format(noLeaderPartitionSet)) 13 val brokers = getAllBrokersInCluster(zkClient) // 獲取所有可用broker 節點 14 // 獲取kafka.api.TopicMetadata 序列,kafka.api.TopicMetadata 保存了 topic 和 partitionId,isr,leader,replicas 的信息 15 val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, 16 brokers, 17 config.clientId, 18 config.socketTimeoutMs, 19 correlationId.getAndIncrement).topicsMetadata 20 if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString())) 21 // 2. 根據獲取到的 partition 和 leader node 的關系更新noLeaderPartitionSet 和leaderForPartitionsMap 兩個map集合,其中noLeaderPartitionSet 包含的是沒有確定leader 的 partition 集合,leaderForPartitionsMap 是 已經確定了 leader 的 partition 集合。 22 topicsMetadata.foreach { tmd => 23 val topic = tmd.topic 24 tmd.partitionsMetadata.foreach { pmd => 25 val topicAndPartition = TopicAndPartition(topic, pmd.partitionId) 26 if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) { 27 val leaderBroker = pmd.leader.get 28 leaderForPartitionsMap.put(topicAndPartition, leaderBroker) 29 noLeaderPartitionSet -= topicAndPartition 30 } 31 } 32 } 33 } catch { 34 case t: Throwable => { 35 if (!isRunning.get()) 36 throw t /* If this thread is stopped, propagate this exception to kill the thread. */ 37 else 38 warn("Failed to find leader for %s".format(noLeaderPartitionSet), t) 39 } 40 } finally { 41 lock.unlock() 42 } 43 44 try { 45 // 3. 具體為 partition 分配 fetcher 46 addFetcherForPartitions(leaderForPartitionsMap.map{ 47 case (topicAndPartition, broker) => 48 topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())} 49 ) 50 } catch { 51 case t: Throwable => { 52 if (!isRunning.get()) 53 throw t /* If this thread is stopped, propagate this exception to kill the thread. */ 54 else { 55 warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t) 56 lock.lock() 57 noLeaderPartitionSet ++= leaderForPartitionsMap.keySet 58 lock.unlock() 59 } 60 } 61 } 62 // 4. 關閉空閑fetcher線程 63 shutdownIdleFetcherThreads() 64 Thread.sleep(config.refreshLeaderBackoffMs) 65 }
重點看第3 步,具體為 partition 分配 fetcher,addFetcherForPartitions 源碼如下:
1 def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { 2 mapLock synchronized { 3 // 獲取 fetcher 和 partition的映射關系 4 val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) => 5 BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} 6 for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) { 7 8 var fetcherThread: AbstractFetcherThread = null 9 fetcherThreadMap.get(brokerAndFetcherId) match { 10 case Some(f) => fetcherThread = f 11 case None => 12 // 根據brokerAndFetcherId 去初始化Fetcher並啟動 fetcher 13 fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) 14 fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) 15 fetcherThread.start 16 } 17 18 fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) => 19 topicAndPartition -> brokerAndInitOffset.initOffset 20 }) 21 } 22 } 23 24 info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) => 25 "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "})) 26 }
kafka.consumer.ConsumerFetcherManager#createFetcherThread的源碼如下:
1 override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { 2 new ConsumerFetcherThread( 3 "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), 4 config, sourceBroker, partitionMap, this) 5 }
先來看ConsumerFetcherThread的構造方法聲明:
1 class ConsumerFetcherThread(name: String, 2 val config: ConsumerConfig, 3 sourceBroker: Broker, 4 partitionMap: Map[TopicAndPartition, PartitionTopicInfo], 5 val consumerFetcherManager: ConsumerFetcherManager) 6 extends AbstractFetcherThread(name = name, 7 clientId = config.clientId, 8 sourceBroker = sourceBroker, 9 socketTimeout = config.socketTimeoutMs, 10 socketBufferSize = config.socketReceiveBufferBytes, 11 fetchSize = config.fetchMessageMaxBytes, 12 fetcherBrokerId = Request.OrdinaryConsumerId, 13 maxWait = config.fetchWaitMaxMs, 14 minBytes = config.fetchMinBytes, 15 isInterruptible = true)
注意,partitionMap 中的value 是PartitionTopicInfo ,這個對象中封裝了存放fetch結果值的BlockingQueue[FetchedDataChunk] 實例。
再來看 run 方法,其使用的是 kafka.utils.ShutdownableThread#run 方法,上面我們已經看過了,主要看該子類是如何重新 doWork方法的:
1 override def doWork() { 2 inLock(partitionMapLock) { // 加鎖,執行,釋放鎖 3 if (partitionMap.isEmpty) // 如果沒有需要執行的 fetch 操作,等待200ms后返回 4 partitionMapCond.await(200L, TimeUnit.MILLISECONDS) 5 partitionMap.foreach { // 將所有的 fetch 的信息添加到fetchRequestBuilder中 6 case((topicAndPartition, offset)) => 7 fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, 8 offset, fetchSize) 9 } 10 } 11 // 構建批抓取的fetchRequest對象 12 val fetchRequest = fetchRequestBuilder.build() 13 // 處理 FetchRequest 14 if (!fetchRequest.requestInfo.isEmpty) 15 processFetchRequest(fetchRequest) 16 }
其中 kafka.server.AbstractFetcherThread#processFetchRequest 源碼如下:
1 private def processFetchRequest(fetchRequest: FetchRequest) { 2 val partitionsWithError = new mutable.HashSet[TopicAndPartition] 3 var response: FetchResponse = null 4 try { 5 trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) 6 // 發送請求,並獲取返回值。 7 // simpleConsumer 就是SimpleConsumer 實例,已作說明,不再贅述。 8 response = simpleConsumer.fetch(fetchRequest) 9 } catch { 10 case t: Throwable => 11 if (isRunning.get) { 12 warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString)) 13 partitionMapLock synchronized { 14 partitionsWithError ++= partitionMap.keys 15 } 16 } 17 } 18 fetcherStats.requestRate.mark() 19 20 if (response != null) { 21 // process fetched data 22 inLock(partitionMapLock) { // 獲取鎖,執行處理response 操作,釋放鎖 23 response.data.foreach { 24 case(topicAndPartition, partitionData) => 25 val (topic, partitionId) = topicAndPartition.asTuple 26 val currentOffset = partitionMap.get(topicAndPartition) 27 // we append to the log if the current offset is defined and it is the same as the offset requested during fetch 28 if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) { 29 partitionData.error match { // 根據返回碼來確定具體執行哪部分處理邏輯 30 case ErrorMapping.NoError => // 成功返回,沒有錯誤 31 try { 32 val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] 33 val validBytes = messages.validBytes 34 val newOffset = messages.shallowIterator.toSeq.lastOption match { 35 case Some(m: MessageAndOffset) => m.nextOffset 36 case None => currentOffset.get 37 } 38 partitionMap.put(topicAndPartition, newOffset) 39 fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset 40 fetcherStats.byteRate.mark(validBytes) 41 // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread 42 processPartitionData(topicAndPartition, currentOffset.get, partitionData) 43 } catch { 44 case ime: InvalidMessageException => // 消息獲取不完整 45 // we log the error and continue. This ensures two things 46 // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag 47 // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and 48 // should get fixed in the subsequent fetches 49 logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) 50 case e: Throwable => 51 throw new KafkaException("error processing data for partition [%s,%d] offset %d" 52 .format(topic, partitionId, currentOffset.get), e) 53 } 54 case ErrorMapping.OffsetOutOfRangeCode => // offset out of range error 55 try { 56 val newOffset = handleOffsetOutOfRange(topicAndPartition) 57 partitionMap.put(topicAndPartition, newOffset) 58 error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" 59 .format(currentOffset.get, topic, partitionId, newOffset)) 60 } catch { 61 case e: Throwable => 62 error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) 63 partitionsWithError += topicAndPartition 64 } 65 case _ => 66 if (isRunning.get) { 67 error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, 68 ErrorMapping.exceptionFor(partitionData.error).getClass)) 69 partitionsWithError += topicAndPartition 70 } 71 } 72 } 73 } 74 } 75 } 76 77 if(partitionsWithError.size > 0) { 78 debug("handling partitions with error for %s".format(partitionsWithError)) 79 handlePartitionsWithErrors(partitionsWithError) 80 } 81 }
其中processPartitionData 源碼如下,它負責處理具體的返回消息:
1 // process fetched data 2 def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { 3 // partitionMap 是一個成員變量,在構造函數中作為入參 4 val pti = partitionMap(topicAndPartition) 5 if (pti.getFetchOffset != fetchOffset) 6 throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d" 7 .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset)) 8 // 數據入隊 9 pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) 10 }
可以看到,終於在這里,把從leader中fetch的消息放入了BlockingQueue[FetchedDataChunk] 緩沖堵塞隊列中。
KafkaStream從queue中堵塞式獲取數據
KafkaStream 是依賴於 LinkedBlockingQueue 的同理 KafkaStream 也會返回一個迭代器 kafka.consumer.ConsumerIterator,用於迭代訪問 KafkaStream 中的數據。
kafka.consumer.ConsumerIterator 的主要源碼如下:
1 // 判斷是否有下一個元素 2 def hasNext(): Boolean = { 3 if(state == FAILED) 4 throw new IllegalStateException("Iterator is in failed state") 5 state match { 6 case DONE => false 7 case READY => true 8 case _ => maybeComputeNext() 9 } 10 } 11 // 獲取下一個元素,父類實現 12 def next(): T = { 13 if(!hasNext()) 14 throw new NoSuchElementException() 15 state = NOT_READY 16 if(nextItem == null) 17 throw new IllegalStateException("Expected item but none found.") 18 nextItem 19 } 20 // 獲取下一個元素,使用子類ConsumerIterator實現 21 override def next(): MessageAndMetadata[K, V] = { 22 val item = super.next() // 調用父類實現 23 if(consumedOffset < 0) 24 throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) 25 currentTopicInfo.resetConsumeOffset(consumedOffset) 26 val topic = currentTopicInfo.topic 27 trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) 28 consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() 29 consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() 30 item 31 } 32 // 或許有,嘗試計算一下下一個 33 def maybeComputeNext(): Boolean = { 34 state = FAILED 35 nextItem = makeNext() 36 if(state == DONE) { 37 false 38 } else { 39 state = READY 40 true 41 } 42 } 43 // 創建下一個元素,這個在子類ConsumerIterator中有實現 44 protected def makeNext(): MessageAndMetadata[K, V] = { 45 // 首先channel 是 LinkedBlockingQueue實例, 是 KafkaStream 中的 queue 成員變量,queue 成員變量 46 var currentDataChunk: FetchedDataChunk = null 47 // if we don't have an iterator, get one 48 var localCurrent = current.get() 49 // 如果沒有迭代器或者是沒有下一個元素了,需要從channel中取一個 50 if(localCurrent == null || !localCurrent.hasNext) { 51 // 刪除並返回隊列的頭節點。 52 if (consumerTimeoutMs < 0) 53 currentDataChunk = channel.take // 阻塞方法,一直等待,直到有可用元素 54 else { 55 currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) // 阻塞方法,等待指定時間,超時也會返回 56 if (currentDataChunk == null) { // 如果沒有數據,重置狀態為NOT_READY 57 // reset state to make the iterator re-iterable 58 resetState() 59 throw new ConsumerTimeoutException 60 } 61 } 62 // 關閉命令 63 if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { 64 debug("Received the shutdown command") 65 return allDone // 該函數將狀態設為DONE, 返回null 66 } else { 67 currentTopicInfo = currentDataChunk.topicInfo 68 val cdcFetchOffset = currentDataChunk.fetchOffset 69 val ctiConsumeOffset = currentTopicInfo.getConsumeOffset 70 if (ctiConsumeOffset < cdcFetchOffset) { 71 error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" 72 .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) 73 currentTopicInfo.resetConsumeOffset(cdcFetchOffset) 74 } 75 localCurrent = currentDataChunk.messages.iterator 76 77 current.set(localCurrent) 78 } 79 // if we just updated the current chunk and it is empty that means the fetch size is too small! 80 if(currentDataChunk.messages.validBytes == 0) 81 throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + 82 "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." 83 .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) 84 } 85 var item = localCurrent.next() 86 // reject the messages that have already been consumed 87 while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) { 88 item = localCurrent.next() 89 } 90 consumedOffset = item.nextOffset 91 92 item.message.ensureValid() // validate checksum of message to ensure it is valid 93 // 返回處理封裝好的 kafka 數據 94 new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder) 95 }
消費到的數據cache 到WAL中
我們再來看,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart 的第10 步相應的代碼:
1 // 10. 將待處理的MessageHandler 放入 線程池中,等待執行 2 topicMessageStreams.values.foreach { streams => 3 streams.foreach { stream => 4 messageHandlerThreadPool.submit(new MessageHandler(stream)) 5 } 6 }
其中 MessageHandler 是一個 Runnable 對象,其 run 方法如下:
1 override def run(): Unit = { 2 while (!isStopped) { 3 try { 4 // 1. 獲取ConsumerIterator 迭代器對象 5 val streamIterator = stream.iterator() 6 // 2. 遍歷迭代器中獲取每一條數據,並且保存message和相應的 metadata 信息 7 while (streamIterator.hasNext) { 8 storeMessageAndMetadata(streamIterator.next) 9 } 10 } catch { 11 case e: Exception => 12 reportError("Error handling message", e) 13 } 14 } 15 }
其中第二步中關鍵方法,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#storeMessageAndMetadata 方法如下:
1 /** Store a Kafka message and the associated metadata as a tuple. */ 2 private def storeMessageAndMetadata( 3 msgAndMetadata: MessageAndMetadata[K, V]): Unit = { 4 val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition) 5 val data = (msgAndMetadata.key, msgAndMetadata.message) 6 val metadata = (topicAndPartition, msgAndMetadata.offset) 7 // 添加數據到 block 8 blockGenerator.addDataWithCallback(data, metadata) 9 }
addDataWithCallback 源碼如下:
1 /** 2 * Push a single data item into the buffer. After buffering the data, the 3 * `BlockGeneratorListener.onAddData` callback will be called. 4 */ 5 def addDataWithCallback(data: Any, metadata: Any): Unit = { 6 if (state == Active) { 7 waitToPush() 8 synchronized { 9 if (state == Active) { 10 // 1. 將數據放入 buffer 中,以便處理線程從中獲取數據 11 currentBuffer += data 12 // 2. 在啟動 receiver線程中,可以知道listener 是指GeneratedBlockHandler 實例 13 listener.onAddData(data, metadata) 14 } else { 15 throw new SparkException( 16 "Cannot add data as BlockGenerator has not been started or has been stopped") 17 } 18 } 19 } else { 20 throw new SparkException( 21 "Cannot add data as BlockGenerator has not been started or has been stopped") 22 } 23 }
第二步比較簡單,先看一下第二步:
org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler#onAddData的源碼如下:
1 def onAddData(data: Any, metadata: Any): Unit = { 2 // Update the offset of the data that was added to the generator 3 if (metadata != null) { 4 val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)] 5 updateOffset(topicAndPartition, offset) 6 } 7 } 8 // 這里的 updateOffset 調用的是//org.apache.spark.streaming.kafka.ReliableKafkaReceiver#updateOffset,源碼如下: 9 /** Update stored offset */ 10 private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = { 11 topicPartitionOffsetMap.put(topicAndPartition, offset) 12 }
第一步的原理如下:
在 BlockGenerator中有一個定時器,定時(200ms)去執行檢查currentBuffer是否為empty任務, 若不為空,則執行如下操作並把它放入等待生成block 的隊列中,有兩外一個線程來時刻監聽這個隊列,有數據,則執行pushBlock 操作。
第一個定時器線程如下:
1 private val blockIntervalTimer = 2 new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator") 3 4 // 其中,updateCurrentBuffer 方法如下 5 /** Change the buffer to which single records are added to. */ 6 private def updateCurrentBuffer(time: Long): Unit = { 7 try { 8 var newBlock: Block = null 9 synchronized { 10 if (currentBuffer.nonEmpty) { 11 val newBlockBuffer = currentBuffer 12 currentBuffer = new ArrayBuffer[Any] 13 val blockId = StreamBlockId(receiverId, time - blockIntervalMs) 14 listener.onGenerateBlock(blockId) 15 newBlock = new Block(blockId, newBlockBuffer) 16 } 17 } 18 19 if (newBlock != null) { 20 blocksForPushing.put(newBlock) // put is blocking when queue is full 21 } 22 } catch { 23 case ie: InterruptedException => 24 logInfo("Block updating timer thread was interrupted") 25 case e: Exception => 26 reportError("Error in block updating thread", e) 27 } 28 } 29 30 // listener.onGenerateBlock(blockId) 代碼如下: 31 def onGenerateBlock(blockId: StreamBlockId): Unit = { 32 // Remember the offsets of topics/partitions when a block has been generated 33 rememberBlockOffsets(blockId) 34 } 35 // rememberBlockOffsets 代碼如下: 36 /** 37 * Remember the current offsets for each topic and partition. This is called when a block is 38 * generated. 39 */ 40 private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { 41 // Get a snapshot of current offset map and store with related block id. 42 val offsetSnapshot = topicPartitionOffsetMap.toMap 43 blockOffsetMap.put(blockId, offsetSnapshot) 44 topicPartitionOffsetMap.clear() 45 } 46 // 可以看出,主要是清除 topic-partition-> offset 映射關系 47 // 建立 block 和topic-partition-> offset的映射關系
其中,blocksForPushing是一個有界阻塞隊列,另外一個線程會一直輪詢它。
1 private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize) 2 private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } 3 4 /** Keep pushing blocks to the BlockManager. */ 5 // 這個方法主要的作用就是一直不停地輪詢blocksForPushing隊列,並處理相應的push block 事件。 6 private def keepPushingBlocks() { 7 logInfo("Started block pushing thread") 8 9 def areBlocksBeingGenerated: Boolean = synchronized { 10 state != StoppedGeneratingBlocks 11 } 12 13 try { 14 // While blocks are being generated, keep polling for to-be-pushed blocks and push them. 15 while (areBlocksBeingGenerated) { // 線程沒有被停止,則一直循環 16 // 超時poll操作獲取並刪除頭節點,超過時間(10ms)則返回 17 Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match { 18 case Some(block) => pushBlock(block) // 如果有數據則進行處理。 19 case None => 20 } 21 } 22 23 // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks. 24 logInfo("Pushing out the last " + blocksForPushing.size() + " blocks") 25 while (!blocksForPushing.isEmpty) { // 如果隊列中還有數據,繼續進行處理 26 val block = blocksForPushing.take() // 這是一個堵塞方法,不過現在會馬上返回,因為隊列里面有數據。 27 logDebug(s"Pushing block $block") 28 pushBlock(block) // 處理數據 29 logInfo("Blocks left to push " + blocksForPushing.size()) 30 } 31 logInfo("Stopped block pushing thread") 32 } catch { 33 case ie: InterruptedException => 34 logInfo("Block pushing thread was interrupted") 35 case e: Exception => 36 reportError("Error in block pushing thread", e) 37 } 38 }
其中的pushBlock源碼如下:
1 private def pushBlock(block: Block) { 2 listener.onPushBlock(block.id, block.buffer) 3 logInfo("Pushed block " + block.id) 4 }
其調用的listener(org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler)的 onPushBlock 源碼如下:
1 def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { 2 // Store block and commit the blocks offset 3 storeBlockAndCommitOffset(blockId, arrayBuffer) 4 }
其中,storeBlockAndCommitOffset具體代碼如下:
1 /** 2 * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method 3 * will try a fixed number of times to push the block. If the push fails, the receiver is stopped. 4 */ 5 private def storeBlockAndCommitOffset( 6 blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { 7 var count = 0 8 var pushed = false 9 var exception: Exception = null 10 while (!pushed && count <= 3) { // 整個過程,總共允許3 次重試 11 try { 12 store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]]) 13 pushed = true 14 } catch { 15 case ex: Exception => 16 count += 1 17 exception = ex 18 } 19 } 20 if (pushed) { // 已經push block 21 // 更新 offset 22 Option(blockOffsetMap.get(blockId)).foreach(commitOffset) 23 // 如果已經push 到 BlockManager 中,則不會再保留 block和topic-partition-> offset的映射關系 24 blockOffsetMap.remove(blockId) 25 } else { 26 stop("Error while storing block into Spark", exception) 27 } 28 } 29 // 其中,commitOffset源碼如下: 30 /** 31 * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's 32 * metadata schema in Zookeeper. 33 */ 34 private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = { 35 if (zkClient == null) { 36 val thrown = new IllegalStateException("Zookeeper client is unexpectedly null") 37 stop("Zookeeper client is not initialized before commit offsets to ZK", thrown) 38 return 39 } 40 41 for ((topicAndPart, offset) <- offsetMap) { 42 try { 43 // 獲取在 zk 中 comsumer 的partition的目錄 44 val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic) 45 val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}" 46 // 更新 consumer 的已消費topic-partition 的offset 操作 47 ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString) 48 } catch { 49 case e: Exception => 50 logWarning(s"Exception during commit offset $offset for topic" + 51 s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e) 52 } 53 54 logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " + 55 s"partition ${topicAndPart.partition}") 56 } 57 }
關鍵方法store 如下:
1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ 2 def store(dataBuffer: ArrayBuffer[T]) { 3 supervisor.pushArrayBuffer(dataBuffer, None, None) 4 }
其調用了supervisor(org.apache.spark.streaming.receiver.ReceiverSupervisorImpl實例)的pushArrayBuffer方法,內部操作如下:
1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ 2 def pushArrayBuffer( 3 arrayBuffer: ArrayBuffer[_], 4 metadataOption: Option[Any], 5 blockIdOption: Option[StreamBlockId] 6 ) { 7 pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption) 8 }
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 源碼如下:
1 /** Store block and report it to driver */ 2 def pushAndReportBlock( 3 receivedBlock: ReceivedBlock, 4 metadataOption: Option[Any], 5 blockIdOption: Option[StreamBlockId] 6 ) { 7 // 1.准備blockId,time等信息 8 val blockId = blockIdOption.getOrElse(nextBlockId) 9 val time = System.currentTimeMillis 10 // 2. 執行存儲 block 操作 11 val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) 12 logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") 13 // 3. 獲取保存的message 的記錄數 14 val numRecords = blockStoreResult.numRecords 15 // 4. 通知trackerEndpoint已經添加block,執行更新driver 的WAL操作 16 val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) 17 trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) 18 logDebug(s"Reported block $blockId") 19 }
其中,receivedBlockHandler 的賦值語句如下:
1 private val receivedBlockHandler: ReceivedBlockHandler = { 2 if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { 3 if (checkpointDirOption.isEmpty) { 4 throw new SparkException( 5 "Cannot enable receiver write-ahead log without checkpoint directory set. " + 6 "Please use streamingContext.checkpoint() to set the checkpoint directory. " + 7 "See documentation for more details.") 8 } 9 // enable WAL並且checkpoint dir 不為空,即,在這里,返回WriteAheadLogBasedBlockHandler 對象,這個對象持有了 blockmanager,streamid,storagelevel,conf,checkpointdir 等信息 10 new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, 11 receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) 12 } else { 13 new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) 14 } 15 }
ReceivedBlockHandler 的 storeBlock方法源碼如下:
1 /** 2 * This implementation stores the block into the block manager as well as a write ahead log. 3 * It does this in parallel, using Scala Futures, and returns only after the block has 4 * been stored in both places. 5 */ 6 // 並行地將block 存入 blockmanager 和 write ahead log,使用scala 的Future 機制實現的,當兩個都寫完畢之后,返回。 7 def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { 8 9 var numRecords = None: Option[Long] 10 // Serialize the block so that it can be inserted into both 11 // 1. 將ReceivedBlock序列化(未使用壓縮機制)成字節數組 12 val serializedBlock = block match { // serializedBlock 就是序列化后的結果 13 case ArrayBufferBlock(arrayBuffer) => // go this branch 14 numRecords = Some(arrayBuffer.size.toLong) 15 blockManager.dataSerialize(blockId, arrayBuffer.iterator) 16 case IteratorBlock(iterator) => 17 val countIterator = new CountingIterator(iterator) 18 val serializedBlock = blockManager.dataSerialize(blockId, countIterator) 19 numRecords = countIterator.count 20 serializedBlock 21 case ByteBufferBlock(byteBuffer) => 22 byteBuffer 23 case _ => 24 throw new Exception(s"Could not push $blockId to block manager, unexpected block type") 25 } 26 27 // 2. Store the block in block manager 28 val storeInBlockManagerFuture = Future { 29 val putResult = 30 blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true) 31 if (!putResult.map { _._1 }.contains(blockId)) { 32 throw new SparkException( 33 s"Could not store $blockId to block manager with storage level $storageLevel") 34 } 35 } 36 37 // 3. Store the block in write ahead log 38 val storeInWriteAheadLogFuture = Future { 39 writeAheadLog.write(serializedBlock, clock.getTimeMillis()) 40 } 41 42 // 4. Combine the futures, wait for both to complete, and return the write ahead log record handle 43 val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) 44 // 等待future任務結果返回。默認時間是 30s, 使用spark.streaming.receiver.blockStoreTimeout 參數來變更默認值 45 val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) 46 // 返回cache之后的block 相關信息 47 WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle) 48 }
將WAL的block信息發送給driver
注意WriteAheadLogBasedStoreResult 這個 WriteAheadLogBasedStoreResult 實例,后面 RDD 在處理的時候會使用到。
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 通知driver addBlock 的源碼如下:
1 // 4. 通知trackerEndpoint已經添加block,執行更新driver 的WAL操作 2 val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) 3 trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) 4 logDebug(s"Reported block $blockId")
Driver將WAL block數據寫入到 driver 的WAL中
跳過中間的RPC操作,直接到 driver 端org.apache.spark.streaming.scheduler.ReceiverTracker.ReceiverTrackerEndpoint#receiveAndReply 中:
1 case AddBlock(receivedBlockInfo) => 2 if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) { 3 walBatchingThreadPool.execute(new Runnable { 4 override def run(): Unit = Utils.tryLogNonFatalError { 5 if (active) { 6 context.reply(addBlock(receivedBlockInfo)) 7 } else { 8 throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.") 9 } 10 } 11 }) 12 } else { 13 context.reply(addBlock(receivedBlockInfo)) 14 }
其中 addBlock方法源碼如下:
1 /** Add new blocks for the given stream */ 2 private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 3 receivedBlockTracker.addBlock(receivedBlockInfo) 4 }
其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#addBlock 源碼如下:
1 /** Add received block. This event will get written to the write ahead log (if enabled). */ 2 def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 3 try { 4 val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 5 if (writeResult) { 6 synchronized { 7 getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo 8 } 9 logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 10 s"block ${receivedBlockInfo.blockStoreResult.blockId}") 11 } else { 12 logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " + 13 s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.") 14 } 15 writeResult 16 } catch { 17 case NonFatal(e) => 18 logError(s"Error adding block $receivedBlockInfo", e) 19 false 20 } 21 } 22 /** Write an update to the tracker to the write ahead log */ 23 private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { 24 if (isWriteAheadLogEnabled) { 25 logTrace(s"Writing record: $record") 26 try { 27 writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)), 28 clock.getTimeMillis()) 29 true 30 } catch { 31 case NonFatal(e) => 32 logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e) 33 false 34 } 35 } else { 36 true 37 } 38 } 39 /** Get the queue of received blocks belonging to a particular stream */ 40 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = { 41 streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue) 42 }
上述代碼,主要是將BlockAdditionEvent寫WAL和更新隊列(其實就是mutable.HashMap[Int, ReceivedBlockQueue]),這個隊列中存放的是streamId ->UnallocatedBlock 的映射關系
從WAL RDD 中讀取數據
createStream 源碼如下:
1 /** 2 * Create an input stream that pulls messages from Kafka Brokers. 3 * @param ssc StreamingContext object 4 * @param kafkaParams Map of kafka configuration parameters, 5 * see http://kafka.apache.org/08/configuration.html 6 * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed 7 * in its own thread. 8 * @param storageLevel Storage level to use for storing the received objects 9 * @tparam K type of Kafka message key 10 * @tparam V type of Kafka message value 11 * @tparam U type of Kafka message key decoder 12 * @tparam T type of Kafka message value decoder 13 * @return DStream of (Kafka message key, Kafka message value) 14 */ 15 def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( 16 ssc: StreamingContext, 17 kafkaParams: Map[String, String], 18 topics: Map[String, Int], 19 storageLevel: StorageLevel 20 ): ReceiverInputDStream[(K, V)] = { 21 // 可以通過設置spark.streaming.receiver.writeAheadLog.enable參數為 true來開啟WAL 22 val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) 23 new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) 24 }
創建的是KafkaInputDStream對象:
1 /** 2 * Input stream that pulls messages from a Kafka Broker. 3 * 4 * @param kafkaParams Map of kafka configuration parameters. 5 * See: http://kafka.apache.org/configuration.html 6 * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed 7 * in its own thread. 8 * @param storageLevel RDD storage level. 9 */ 10 private[streaming] 11 class KafkaInputDStream[ 12 K: ClassTag, 13 V: ClassTag, 14 U <: Decoder[_]: ClassTag, 15 T <: Decoder[_]: ClassTag]( 16 ssc_ : StreamingContext, 17 kafkaParams: Map[String, String], 18 topics: Map[String, Int], 19 useReliableReceiver: Boolean, 20 storageLevel: StorageLevel 21 ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { 22 23 def getReceiver(): Receiver[(K, V)] = { 24 if (!useReliableReceiver) { // 未啟用 WAL,會使用 KafkaReceiver 對象 25 new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) 26 } else { // 如果啟用了WAL, 使用ReliableKafkaReceiver 27 new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) 28 } 29 } 30 }
org.apache.spark.streaming.kafka.KafkaInputDStream 繼承父類的 compute方法:
1 /** 2 * Generates RDDs with blocks received by the receiver of this stream. */ 3 override def compute(validTime: Time): Option[RDD[T]] = { 4 val blockRDD = { 5 6 if (validTime < graph.startTime) { 7 // If this is called for any time before the start time of the context, 8 // then this returns an empty RDD. This may happen when recovering from a 9 // driver failure without any write ahead log to recover pre-failure data. 10 new BlockRDD[T](ssc.sc, Array.empty) 11 } else { 12 // Otherwise, ask the tracker for all the blocks that have been allocated to this stream 13 // for this batch 14 val receiverTracker = ssc.scheduler.receiverTracker 15 val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) 16 17 // Register the input blocks information into InputInfoTracker 18 val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) 19 ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) 20 21 // Create the BlockRDD 22 createBlockRDD(validTime, blockInfos) 23 } 24 } 25 Some(blockRDD) 26 }
getBlocksOfBatch 如下:
1 /** Get the blocks for the given batch and all input streams. */ 2 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = { 3 receivedBlockTracker.getBlocksOfBatch(batchTime) 4 } 5 調用: 6 /** Get the blocks allocated to the given batch. */ 7 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized { 8 timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty) 9 }
JobGenerator將WAL block 分配給一個batch,並生成job
取出WAL block 信息
在 org.apache.spark.streaming.scheduler.JobGenerator 中聲明了一個定時器:
1 // timer 會按照批次間隔 生成 GenerateJobs 任務,並放入eventLoop 堵塞隊列中 2 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 3 longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
EventLoop 實例化代碼如下:
1 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { 2 override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) 3 4 override protected def onError(e: Throwable): Unit = { 5 jobScheduler.reportError("Error in job generator", e) 6 } 7 } 8 eventLoop.start()
EventLoop里定義了一個LinkedBlockingDeque雙端堵塞隊列和一個執行daemon線程,daemon線程會不停從 雙端堵塞隊列中堵塞式取數據,一旦取到數據,會調 onReceive 方法,即 processEvent 方法:
1 /** Processes all events */ 2 private def processEvent(event: JobGeneratorEvent) { 3 logDebug("Got event " + event) 4 event match { 5 case GenerateJobs(time) => generateJobs(time) 6 case ClearMetadata(time) => clearMetadata(time) 7 case DoCheckpoint(time, clearCheckpointDataLater) => 8 doCheckpoint(time, clearCheckpointDataLater) 9 case ClearCheckpointData(time) => clearCheckpointData(time) 10 } 11 }
由於是GenerateJobs 事件, 會繼續調用generateJobs 方法:
1 /** Generate jobs and perform checkpoint for the given `time`. */ 2 private def generateJobs(time: Time) { 3 // Set the SparkEnv in this thread, so that job generation code can access the environment 4 // Example: BlockRDDs are created in this thread, and it needs to access BlockManager 5 // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. 6 SparkEnv.set(ssc.env) 7 Try { 8 // 1. 將 WAL block 信息 分配給batch(這些數據塊信息是worker 節點cache 到WAL 之后發送給driver 端的) 9 jobScheduler.receiverTracker.allocateBlocksToBatch(time) 10 // 2. 使用分配的block數據塊來生成任務 11 graph.generateJobs(time) // generate jobs using allocated block 12 } match { 13 case Success(jobs) => 14 val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) 15 jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 16 case Failure(e) => 17 jobScheduler.reportError("Error generating jobs for time " + time, e) 18 } 19 // 發布DoCheckpoint 事件,保存checkpoint操作,主要是將新的checkpoint 數據寫入到 hdfs, 刪除舊的 checkpoint 數據 20 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 21 }
第一步中調用的
org.apache.spark.streaming.scheduler.ReceiverTracker#allocateBlocksToBatch方法如下:
1 /** Allocate all unallocated blocks to the given batch. */ 2 def allocateBlocksToBatch(batchTime: Time): Unit = { 3 if (receiverInputStreams.nonEmpty) { 4 receivedBlockTracker.allocateBlocksToBatch(batchTime) 5 } 6 }
其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#allocateBlocksToBatch 方法如下:
1 def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { 2 if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { 3 // 遍歷輸入流,根據流的 streamId 獲取未被分配的block隊列,並返回[streamId, seq[receivedBlockInfo]],由此可知,到此為止,數據其實已經從receiver中讀出來了。 4 // 獲取 streamid和 WAL的blocks 的映射關系 5 val streamIdToBlocks = streamIds.map { streamId => 6 (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) 7 }.toMap 8 val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) 9 if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { 10 timeToAllocatedBlocks.put(batchTime, allocatedBlocks) 11 lastAllocatedBatchTime = batchTime 12 } else { 13 logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") 14 } 15 } else { 16 // This situation occurs when: 17 // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, 18 // possibly processed batch job or half-processed batch job need to be processed again, 19 // so the batchTime will be equal to lastAllocatedBatchTime. 20 // 2. Slow checkpointing makes recovered batch time older than WAL recovered 21 // lastAllocatedBatchTime. 22 // This situation will only occurs in recovery time. 23 logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") 24 } 25 }
其中,getReceivedBlockQueue的源碼如下:
1 /** Get the queue of received blocks belonging to a particular stream */ 2 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = { 3 streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue) 4 }
可以看到,worker node 發送過來的block 數據被取出來了。
根據WAL block創建 RDD
org.apache.spark.streaming.dstream.ReceiverInputDStream#createBlockRDD 源碼如下:
1 private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = { 2 3 if (blockInfos.nonEmpty) { 4 val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray 5 // 所有的block已經有了WriteAheadLogRecordHandle, 創建一個WALBackedBlockRDD即可, 否則創建BlockRDD。 6 // 其中,WriteAheadLogRecordHandle 是一個跟WAL 相關聯的EntryInfo,實現類FileBasedWriteAheadLogSegment就包含了WAL segment 的path, offset 以及 length 信息。RDD 在真正需要數據時,根據這些handle信息從 WAL 中讀取數據。 7 // Are WAL record handles present with all the blocks 8 val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } 9 10 if (areWALRecordHandlesPresent) { 11 // If all the blocks have WAL record handle, then create a WALBackedBlockRDD 12 val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray 13 val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray 14 new WriteAheadLogBackedBlockRDD[T]( 15 ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) 16 } else { 17 // Else, create a BlockRDD. However, if there are some blocks with WAL info but not 18 // others then that is unexpected and log a warning accordingly. 19 if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { 20 if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { 21 logError("Some blocks do not have Write Ahead Log information; " + 22 "this is unexpected and data may not be recoverable after driver failures") 23 } else { 24 logWarning("Some blocks have Write Ahead Log information; this is unexpected") 25 } 26 } 27 val validBlockIds = blockIds.filter { id => 28 ssc.sparkContext.env.blockManager.master.contains(id) 29 } 30 if (validBlockIds.size != blockIds.size) { 31 logWarning("Some blocks could not be recovered as they were not found in memory. " + 32 "To prevent such data loss, enabled Write Ahead Log (see programming guide " + 33 "for more details.") 34 } 35 new BlockRDD[T](ssc.sc, validBlockIds) 36 } 37 } else { 38 // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD 39 // according to the configuration 40 if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { 41 new WriteAheadLogBackedBlockRDD[T]( 42 ssc.sparkContext, Array.empty, Array.empty, Array.empty) 43 } else { 44 new BlockRDD[T](ssc.sc, Array.empty) 45 } 46 } 47 }
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD#compute 的源碼如下:
1 /** 2 * Gets the partition data by getting the corresponding block from the block manager. 3 * If the block does not exist, then the data is read from the corresponding record 4 * in write ahead log files. 5 */ 6 override def compute(split: Partition, context: TaskContext): Iterator[T] = { 7 assertValid() 8 val hadoopConf = broadcastedHadoopConf.value 9 val blockManager = SparkEnv.get.blockManager 10 val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] 11 val blockId = partition.blockId 12 13 def getBlockFromBlockManager(): Option[Iterator[T]] = { 14 blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]]) 15 } 16 17 def getBlockFromWriteAheadLog(): Iterator[T] = { 18 var dataRead: ByteBuffer = null 19 var writeAheadLog: WriteAheadLog = null 20 try { 21 // The WriteAheadLogUtils.createLog*** method needs a directory to create a 22 // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for 23 // writing log data. However, the directory is not needed if data needs to be read, hence 24 // a dummy path is provided to satisfy the method parameter requirements. 25 // FileBasedWriteAheadLog will not create any file or directory at that path. 26 // FileBasedWriteAheadLog will not create any file or directory at that path. Also, 27 // this dummy directory should not already exist otherwise the WAL will try to recover 28 // past events from the directory and throw errors. 29 val nonExistentDirectory = new File( 30 System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath 31 writeAheadLog = WriteAheadLogUtils.createLogForReceiver( 32 SparkEnv.get.conf, nonExistentDirectory, hadoopConf) 33 dataRead = writeAheadLog.read(partition.walRecordHandle) 34 } catch { 35 case NonFatal(e) => 36 throw new SparkException( 37 s"Could not read data from write ahead log record ${partition.walRecordHandle}", e) 38 } finally { 39 if (writeAheadLog != null) { 40 writeAheadLog.close() 41 writeAheadLog = null 42 } 43 } 44 if (dataRead == null) { 45 throw new SparkException( 46 s"Could not read data from write ahead log record ${partition.walRecordHandle}, " + 47 s"read returned null") 48 } 49 logInfo(s"Read partition data of $this from write ahead log, record handle " + 50 partition.walRecordHandle) 51 if (storeInBlockManager) { 52 blockManager.putBytes(blockId, dataRead, storageLevel) 53 logDebug(s"Stored partition data of $this into block manager with level $storageLevel") 54 dataRead.rewind() 55 } 56 blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] 57 } 58 // 如果partition.isBlockIdValid 為true,則說明該 block 數據存在executors 中 59 if (partition.isBlockIdValid) { 60 // 先根據 BlockManager從 executor中讀取數據, 如果沒有,再從WAL 中讀取數據 61 // BlockManager 從內存還是從磁盤上獲取的數據 ? 62 blockManager 從 local 或 remote 獲取 block,其中 local既可以從 memory 中獲取也可以從 磁盤中讀取, 其中remote獲取數據是同步的,即在fetch block 過程中會一直blocking。 63 getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() } 64 } else { 65 getBlockFromWriteAheadLog() 66 } 67 }
至此,從啟動 receiver,到receiver 接收數據並保存到WAL block,driver 接收WAL 的block 信息,直到spark streaming 通過WAL RDD 來獲取數據等等都一一做了說明。