1.問題
主要是updateStateByKey的問題
有的值不需要變化的時候,還會再打印出來。
每個批次的數據都會出現,如果向redis保存更新的時候,會把不需要變化的值也更新,這個不是我們需要的,我們只需要更新有變化的那部分值。
2.mapWithState
有一個注解,說明是實驗性質的。
3.程序
1 package com.stream.it 2 import org.apache.spark.rdd.RDD 3 import org.apache.spark.storage.StorageLevel 4 import org.apache.spark.streaming.dstream.DStream 5 import org.apache.spark.streaming.kafka.KafkaUtils 6 import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext} 7 import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} 8 9 object MapWithState { 10 def main(args: Array[String]): Unit = { 11 val conf = new SparkConf() 12 .setAppName("StreamingMapWithState") 13 .setMaster("local[*]") 14 val sc = SparkContext.getOrCreate(conf) 15 val ssc = new StreamingContext(sc, Seconds(1)) 16 // 當調用updateStateByKey函數API的時候,必須給定checkpoint dir 17 // 路徑對應的文件夾不能存在 18 ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/chkdir45254") 19 20 /** 21 * 22 * @param key DStream的key數據類型 23 * @param values DStream的value數據類型 24 * @param state 是StreamingContext中之前該key的狀態值 25 * @return 26 */ 27 def mappingFunction(key: String, values: Option[Int], state: State[Long]): (String, Long) = { 28 // 獲取之前狀態的值 29 val preStateValue = state.getOption().getOrElse(0L) 30 // 計算出當前值 31 val currentStateValue = preStateValue + values.getOrElse(0) 32 33 // 更新狀態值 34 state.update(currentStateValue) 35 36 // 返回結果 37 (key, currentStateValue) 38 } 39 val spec = StateSpec.function[String, Int, Long, (String, Long)](mappingFunction _) 40 41 val kafkaParams = Map( 42 "group.id" -> "streaming-kafka-001231", 43 "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka", 44 "auto.offset.reset" -> "smallest" 45 ) 46 val topics = Map("beifeng" -> 4) // topics中value是讀取數據的線程數量,所以必須大於等於1 47 val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder]( 48 ssc, // 給定SparkStreaming上下文 49 kafkaParams, // 給定連接kafka的參數信息 ===> 通過Kafka HighLevelConsumerAPI連接 50 topics, // 給定讀取對應topic的名稱以及讀取數據的線程數量 51 StorageLevel.MEMORY_AND_DISK_2 // 指定數據接收器接收到kafka的數據后保存的存儲級別 52 ).map(_._2) 53 54 val resultWordCount: DStream[(String, Long)] = dstream 55 .filter(line => line.nonEmpty) 56 .flatMap(line => line.split(" ").map((_, 1))) 57 .reduceByKey(_ + _) 58 .mapWithState(spec) 59 60 resultWordCount.print() // 這個也是打印數據 61 62 // 啟動開始處理 63 ssc.start() 64 ssc.awaitTermination() // 等等結束,監控一個線程的中斷操作 65 } 66 }
4.效果
在控制台上再寫入一個hadoop:
說明了,在新寫入的時候,才會出現,但是以前的數據還在。
5.說明
因為存在checkpoint,在重新后,以前的數據還在,新加入數據后,會在原有的基礎上進行更新,上面的第二幅圖就是這樣產生的。