package SparkStreaming import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, Seconds, StreamingContext} /** * Created by 古城小巷少年 on 2020-01-03 10:03 */ object KafkaDirectWordCount { def main(args: Array[String]): Unit = { // 指定消費者組名,多個消費者組消費同一個topic,每個組記錄各自的offset // 一個topic的分區只能被消費者組內的一個消費者消費。 val group = "g001" val conf = new SparkConf().setAppName("kafkaDirectWordCount").setMaster("local[2]") // 創建SparkSteaming val ssc = new StreamingContext(conf, Seconds(5)) // 指定消費的topic名字 val topic = "wwcc" // 指定kafka的broker地址 val brokerList = "hadoop102:9092,hadoop103:9092,hadoop104:9092" // 指定zk的地址,后期更細消費的偏移量時使用 val zkQuorum = "hadoop102:2181,hadoop103:2181,hadoop104:2181" // 創建stream時使用的topic名字集合,sparkStreaming可以消費多個topic val topics: Set[String] = Set(topic) // 創建一個ZKGroupTopicDirs對象,其實是指往zk中寫入數據的目錄,用於保存偏移量 val topicDirs = new ZKGroupTopicDirs(group, topic) // 獲取zookeeper中的路徑"/g001/offsets/wwcc/" val zkTopicPath = s"${topicDirs.consumerOffsetDir}" // 准備kafka的參數 val kafkaParams: Map[String, String] = Map( "metadata.broker.list" -> brokerList, "group.id" -> group, // 從頭開始讀取數據 "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString ) // 創建zookeeper客戶端,可以從zk中讀取偏移量數據,並更新偏移量 val zkClient = new ZkClient(zkQuorum) // 查詢該路徑下是否有字節點 val children: Int = zkClient.countChildren(zkTopicPath) var kafkaStream: InputDStream[(String, String)] = null // 如果zookeeper中保存有offset,則利用這個offset作為kafkaStream的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() // 如果保存過offset if(children > 0) { for (i<-0 until children){ // 讀取偏移量 val partitionOffset: String = zkClient.readData[String](s"$zkTopicPath/${i}") // 將topic和partition封裝到ip對象里 wordcount/0 val tp = TopicAndPartition(topic, i) // 將key:TopicAndPartition, value:partitionOffset寫入fromOffsets Map中 fromOffsets += (tp -> partitionOffset) } // 將Kafka的消息進行轉換成(key,value)形式,value是消息內容,key是元數據信息 val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) // 通過kafkaUtils創建直連的Dstream // fromOffsets的作用是按照前面計算好的偏移量繼續消費數據 val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler) } else { // 如果未保存 val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } // 偏移量的范圍 var offsetRanges: Array[OffsetRange] = Array[OffsetRange]() // 從kafka讀取數據,DSteam的transform方法可以將當前批次的RDD獲取出來 // 該transform方法計算獲取到當前批次RDD,然后將RDD的偏移量取出來,然后將RDD返回到DStream val transform: DStream[(String, String)] = kafkaStream.transform(rdd => { // 得到該RDD對應的kafka的消息的offset // 該RDD是一個kafkaRDD,可以獲得偏移量的范圍 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }) val messages: DStream[String] = transform.map(_._2) // 依次迭代DSteam中的RDD messages.foreachRDD( rdd => { rdd.foreachPartition(partition => { partition.foreach(x=> { println(x) }) }) for (o <- offsetRanges){ val zPath = s"${topicDirs.consumerOffsetDir}/${(o.partition)}" // 將該partition的offset保存到zookeeper中 ZkUtils.updatePersistentPath(zkClient, zPath, o.untilOffset.toString) } }) ssc.start() ssc.awaitTermination() } }