SparkStreaming的Receiver方式和直連方式有什么區別?
Receiver接收固定時間間隔的數據(放在內存中的),使用高級API,自動維護偏移量,達到固定的時間才去進行處理,效率低並且容易丟失數據,靈活性特別差,不好,而且它處理數據的時候,如果某一刻的數據量過大,那么就會造成磁盤溢寫的情況,他通過WALS進行磁盤寫入。
Receiver實現方式:
代碼如下:
object KafkaWC02 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("kafkaWC").setMaster("local[2]") //設置線程數 val ssc = new StreamingContext(conf, Seconds(5)) //設置檢查點 ssc.checkpoint("D:\\data\\checpoint\\checpoint1") //接下來編寫kafka的配置信息 val zks = "spark01:2181" //然后是kafka的消費組 val groupId = "gp1" //Topic的名字 Map的key是Topic名字,第二個參數是線程數 val topics = Map[String, Int]("test02" -> 1) //創建kafka的輸入數據流,來獲取kafka中的數據 val data = KafkaUtils.createStream(ssc, zks, groupId, topics) //獲取到的數據是鍵值對的格式(key,value) //獲取到的數據是 key是偏移量 value是數據 //接下來開始處理數據 val lines = data.flatMap(_._2.split(" ")) val words = lines.map((_, 1)) val res = words.updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true) res.print() //val result = words.reduceByKey(_ + _) //val res = result.updateStateByKey[Int](updateFunc) //res.print() //打印輸出 //result.print() //啟動程序 ssc.start() //等待停止 ssc.awaitTermination() } //(iterator:Iteratot[(K,Seq[V]),Option[S]])) //傳過來的值是Key Value類型 //第一個參數,是我們從kafka獲取到的元素,key ,String類型 //第二個參數,是我們進行單詞統計的value值,Int類型 //第三個參數,是我們每次批次提交的中間結果集 val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{ iter.map(t=>{ (t._1,t._2.sum+t._3.getOrElse(0)) }) } }
Direct直連方式,
它使用的是底層API實現Offest我們開發人員管理,這樣的話,它的靈活性特別好。並且可以保證數據的安全性,而且不用擔心數據量過大,因為它有預處理機制,進行提前處理,然后再批次提交任務。
Direct實現方式:
代碼如下:
import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, StreamingContext} /** * 重要!!! Direct直連方式 */ object KafkaDirectWC { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Direct").setMaster("local[2]") val ssc = new StreamingContext(conf,Duration(5000)) //指定組名 val groupId = "gp01" //指定消費的topic名字 val topic = "tt" //指定kafka的Broker地址(SparkStreaming的Task直接連接到Kafka分區上,用的是底層API消費) val brokerList ="spark:9092" //接下來我們要自己維護offset了,將offset保存到ZK中 val zkQuorum = "spark:2181" //創建stream時使用的topic名字集合,SparkStreaming可以同時消費多個topic val topics:Set[String] = Set(topic) //創建一個ZkGroupTopicDirs對象,其實是指定往Zk中寫入數據的目錄 // 用於保存偏移量 val TopicDirs = new ZKGroupTopicDirs(groupId,topic) //獲取zookeeper中的路徑“/gp01/offset/tt/” val zkTopicPath = s"${TopicDirs.consumerOffsetDir}" //准備kafka參數 val kafkas = Map( "metadata.broker.list"->brokerList, "group.id"->groupId, //從頭開始讀取數據 "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString ) // zookeeper 的host和ip,創建一個client,用於更新偏移量 // 是zookeeper客戶端,可以從zk中讀取偏移量數據,並更新偏移量 val zkClient = new ZkClient(zkQuorum) //"/gp01/offset/tt/0/10001" //"/gp01/offset/tt/1/20001" //"/gp01/offset/tt/2/30001" val clientOffset = zkClient.countChildren(zkTopicPath) // 創建KafkaStream var kafkaStream :InputDStream[(String,String)]= null //如果zookeeper中有保存offset 我們會利用這個offset作為KafkaStream的起始位置 //TopicAndPartition [/gp01/offset/tt/0/ , 8888] var fromOffsets:Map[TopicAndPartition,Long] = Map() //如果保存過offset if(clientOffset > 0){ //clientOffset 的數量其實就是 /gp01/offset/tt的分區數目 for(i<-0 until clientOffset){ // /gp01/offset/tt/ 0/10001 val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}") // tt/0 val tp = TopicAndPartition(topic,i) //將不同partition 對應得offset增加到fromoffset中 // tt/0 -> 10001 fromOffsets += (tp->partitionOffset.toLong) } // key 是kafka的key value 就是kafka數據 // 這個會將kafka的消息進行transform 最終kafka的數據都會變成(kafka的key,message)這樣的Tuple val messageHandler = (mmd:MessageAndMetadata[String,String])=> (mmd.key(),mmd.message()) // 通過kafkaUtils創建直連的DStream //[String,String,StringDecoder, StringDecoder,(String,String)] // key value key解碼方式 value的解碼方式 接收數據的格式 kafkaStream = KafkaUtils.createDirectStream [String,String,StringDecoder, StringDecoder,(String,String)](ssc,kafkas,fromOffsets,messageHandler) }else{ //如果未保存,根據kafkas的配置使用最新的或者最舊的offset kafkaStream = KafkaUtils.createDirectStream [String,String,StringDecoder,StringDecoder](ssc,kafkas,topics) } //偏移量范圍 var offsetRanges = Array[OffsetRange]() //從kafka讀取的數據,是批次提交的,那么這塊注意下, // 我們每次進行讀取數據后,需要更新維護一下偏移量 //那么我們開始進行取值 // val transform = kafkaStream.transform{ // rdd=> // //得到該RDD對應得kafka消息的offset // // 然后獲取偏移量 // offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // rdd // } // val mes = transform.map(_._2) // 依次迭代DStream中的RDD kafkaStream.foreachRDD{ //對RDD進行操作 觸發Action kafkardd=> offsetRanges = kafkardd.asInstanceOf[HasOffsetRanges].offsetRanges //下面 你就可以怎么寫都行了,為所欲為 val maps = kafkardd.map(_._2) maps.foreach(println) for(o<-offsetRanges){ // /gp01/offset/tt/ 0 val zkpath = s"${TopicDirs.consumerOffsetDir}/${o.partition}" //將該partition的offset保存到zookeeper中 // /gp01/offset/tt/ 0/88889 ZkUtils.updatePersistentPath(zkClient,zkpath,o.untilOffset.toString) } } // 啟動 ssc.start() ssc.awaitTermination() } }