068 mapWithState函數的講解


1.問題

  主要是updateStateByKey的問題

  有的值不需要變化的時候,還會再打印出來。

  每個批次的數據都會出現,如果向redis保存更新的時候,會把不需要變化的值也更新,這個不是我們需要的,我們只需要更新有變化的那部分值。

  

2.mapWithState

  有一個注解,說明是實驗性質的。

  

 

3.程序

 1 package com.stream.it
 2 import org.apache.spark.rdd.RDD
 3 import org.apache.spark.storage.StorageLevel
 4 import org.apache.spark.streaming.dstream.DStream
 5 import org.apache.spark.streaming.kafka.KafkaUtils
 6 import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
 7 import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
 8 
 9 object MapWithState {
10   def main(args: Array[String]): Unit = {
11     val conf = new SparkConf()
12       .setAppName("StreamingMapWithState")
13       .setMaster("local[*]")
14     val sc = SparkContext.getOrCreate(conf)
15     val ssc = new StreamingContext(sc, Seconds(1))
16     // 當調用updateStateByKey函數API的時候,必須給定checkpoint dir
17     // 路徑對應的文件夾不能存在
18     ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/chkdir45254")
19 
20     /**
21       *
22       * @param key    DStream的key數據類型
23       * @param values DStream的value數據類型
24       * @param state  是StreamingContext中之前該key的狀態值
25       * @return
26       */
27     def mappingFunction(key: String, values: Option[Int], state: State[Long]): (String, Long) = {
28       // 獲取之前狀態的值
29       val preStateValue = state.getOption().getOrElse(0L)
30       // 計算出當前值
31       val currentStateValue = preStateValue + values.getOrElse(0)
32 
33       // 更新狀態值
34       state.update(currentStateValue)
35 
36       // 返回結果
37       (key, currentStateValue)
38     }
39     val spec = StateSpec.function[String, Int, Long, (String, Long)](mappingFunction _)
40 
41     val kafkaParams = Map(
42       "group.id" -> "streaming-kafka-001231",
43       "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
44       "auto.offset.reset" -> "smallest"
45     )
46     val topics = Map("beifeng" -> 4) // topics中value是讀取數據的線程數量,所以必須大於等於1
47     val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
48       ssc, // 給定SparkStreaming上下文
49       kafkaParams, // 給定連接kafka的參數信息 ===> 通過Kafka HighLevelConsumerAPI連接
50       topics, // 給定讀取對應topic的名稱以及讀取數據的線程數量
51       StorageLevel.MEMORY_AND_DISK_2 // 指定數據接收器接收到kafka的數據后保存的存儲級別
52     ).map(_._2)
53 
54     val resultWordCount: DStream[(String, Long)] = dstream
55       .filter(line => line.nonEmpty)
56       .flatMap(line => line.split(" ").map((_, 1)))
57       .reduceByKey(_ + _)
58       .mapWithState(spec)
59 
60     resultWordCount.print() // 這個也是打印數據
61 
62     // 啟動開始處理
63     ssc.start()
64     ssc.awaitTermination() // 等等結束,監控一個線程的中斷操作
65   }
66 }

 

4.效果

  

  在控制台上再寫入一個hadoop:

    說明了,在新寫入的時候,才會出現,但是以前的數據還在。

  

 

5.說明

  因為存在checkpoint,在重新后,以前的數據還在,新加入數據后,會在原有的基礎上進行更新,上面的第二幅圖就是這樣產生的。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM