package com.home.spark.streaming import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies} /** * @Description: * TODO 更新數據狀態,把每個采集周期的數據進行整合業務處理 * 無狀態操作,即操作的數據都是每個批次內的數據(一個采集周期) * 狀態操作,即操作從啟動到當前的所有采集周期內的數據(跨批次操作) * * UpdateStateByKey原語用於記錄歷史記錄,有時,我們需要在 DStream 中跨批次維護狀態(例如流計算中累加wordcount)。 * 針對這種情況,updateStateByKey() 為我們提供了對一個狀態變量的訪問,用於鍵值對形式的 DStream。 * 給定一個由(鍵,事件)對構成的 DStream,並傳遞一個指定如何根據新的事件 更新每個鍵對應狀態的函數, * 它可以構建出一個新的 DStream,其內部數據為(鍵,狀態) 對。 * updateStateByKey() 的結果會是一個新的 DStream,其內部的 RDD 序列是由每個時間區間對應的(鍵,狀態)對組成的。 * updateStateByKey操作使得我們可以在用新信息進行更新時保持任意的狀態。為使用這個功能,你需要做下面兩步: * 1. 定義狀態,狀態可以是一個任意的數據類型。 * 2. 定義狀態更新函數,用此函數闡明如何使用之前的狀態和來自輸入流的新值對狀態進行更新。 * 使用updateStateByKey需要對檢查點目錄進行配置,會使用檢查點來保存狀態 **/ object Ex_updateState { def main(args: Array[String]): Unit = { val conf = new SparkConf(true).setMaster("local[*]").setAppName("spark streaming wordcount") conf.set("spark.streaming.stopGracefullyOnShutdown", "true") //環境對象,設置采集周期 val scc: StreamingContext = new StreamingContext(conf, Seconds(10)) // TODO: 可以通過ssc.sparkContext 來訪問SparkContext或者通過已經存在的SparkContext來創建StreamingContext //設置檢查點目錄 scc.sparkContext.setCheckpointDir("checkpoint") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "192.168.44.10:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (true: java.lang.Boolean) ) val topics = Array("test") val kafkaStream = KafkaUtils.createDirectStream[String, String]( scc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String]( topics, kafkaParams ) ) val words: DStream[String] = kafkaStream.flatMap(t => t.value().split(" ")) // val words: DStream[String] = socketStream.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) //無狀態操作 val wordCounts: DStream[(String, Int)] = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console // ("------當前窗口數據--------") wordCounts.print //有狀態操作,更新狀態,將所有批次(即采集周期)的詞頻累計 val updatedWordCounts: DStream[(String, Int)] = pairs.updateStateByKey { case (seq, buffer) => { val sum = buffer.getOrElse(0) + seq.sum Option(sum) } } // ("------合計數據--------") updatedWordCounts.print // Start the computation // 通過 streamingContext.start()來啟動消息采集和處理 scc.start() // Wait for the computation to terminate // 通過streamingContext.stop()來手動終止處理程序 scc.awaitTermination() } }