Spark Streaming之窗口函數和狀態轉換函數


流處理主要有3種應用場景:無狀態操作、window操作、狀態操作。

reduceByKeyAndWindow

import kafka.serializer.StringDecoder import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SQLContext import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming._ import org.apache.spark.{SparkContext, SparkConf} object ClickStream { def main (args: Array[String]){ // 屏蔽不必要的日志顯示在終端上
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //創建SparkConf對象,設置應用程序的名稱,在程序運行的監控界面可以看到名稱
    val conf = new SparkConf().setAppName("ClickStream").setMaster("local[*]") val sc = new SparkContext(conf) //此處設置Batch Interval是在Spark Streaming中生成基本Job的時間單位,窗口和滑動時間間隔一定是該Batch Interval的整數倍
    val ssc = new StreamingContext(sc, Seconds(args(0).toLong)) //由於用到了窗口函數,需要復用前面的RDD,必須checkpoint,注意復用的RDD之間是沒有任何關系的
    ssc.checkpoint(args(1)) val topics = Set("clickstream")    //所要獲取數據在kafka上的主題
    val brokers = "yz4203.hadoop.data.sina.com.cn:19092,yz4202.hadoop.data.sina.com.cn:19092,10.39.4.212:19092,10.39.4.201:19092" val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) //val offset = "largest" //values: smallest, largest ,控制讀取最新的數據,還是舊的數據, 默認值為largest //從Spark1.3開始,我們能夠使用如下方式高效地從kafka上獲取數據
    val kvsTemp = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val kvs = kvsTemp.map(line => line._2)                 //第一部分是null為key,第二部分才是所需數據,為string類型 //根據需求對流進來的數據進行清洗、轉換等處理
    val data = kvs.map(_.split("\\t")).filter(_(53) == "finance").map(_(27)).map(_.split("\\?")(0)).filter(! _.contains("iframe")).map((_, 1)) //滑動窗口長度為1小時,滑動間隔為10分鍾,這會得到過去1小時內,url和pv的對應關系 //val pvWindow = data.reduceByKeyAndWindow((v1: Int, v2: Int) => v1+v2, Minutes(60), Minutes(10)) //滑動窗口長度為1小時,滑動間隔為10分鍾,這同樣會得到過去1小時內,url和pv的對應關系,只不過這是加新減舊,第一個參數加上新的,第2個參數,減去上一個batch的。
//和上一個版本的reduceByKeyAndWindow每次都會重新算相比(疊加方式),這種方式(增量方式)更加高效優雅
val pvWindow = data.reduceByKeyAndWindow(_ + _, _ - _, Minutes(60), Minutes(10)) pvWindow.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminat ssc.stop(true, true) //優雅地結束 } }

countByValueAndWindow

countByValueAndWindow的源碼如下所示:

 /** * Return a new DStream in which each RDD contains the count of distinct elements in * RDDs in a sliding window over this DStream. Hash partitioning is used to generate * the RDDs with `numPartitions` partitions (Spark's default number of partitions if * `numPartitions` not specified). * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval * @param numPartitions number of partitions of each RDD in the new DStream. */ def countByValueAndWindow( windowDuration: Duration, slideDuration: Duration, numPartitions: Int = ssc.sc.defaultParallelism) (implicit ord: Ordering[T] = null) : DStream[(T, Long)] = ssc.withScope { this.map((_, 1L)).reduceByKeyAndWindow( (x: Long, y: Long) => x + y, (x: Long, y: Long) => x - y, windowDuration, slideDuration, numPartitions, (x: (T, Long)) => x._2 != 0L ) }

reduceByWindow

reduceByWindow的源碼如下所示:

/** * Return a new DStream in which each RDD has a single element generated by reducing all * elements in a sliding window over this DStream. However, the reduction is done incrementally * using the old window's reduced value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient than reduceByWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse reduce function; such that for all y, invertible x: * `invReduceFunc(reduceFunc(x, y), x) = y` * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ def reduceByWindow( reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope { this.map((1, _)) .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) .map(_._2) }

countByWindow

countByWindow的源碼如下所示:

 /** * Return a new DStream in which each RDD has a single element generated by counting the number * of elements in a sliding window over this DStream. Hash partitioning is used to generate * the RDDs with Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ def countByWindow( windowDuration: Duration, slideDuration: Duration): DStream[Long] = ssc.withScope { this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) }

由此可見,countByValueAndWindow、reduceByWindow、countByWindow的底層實現都是“加新減舊”版本的reduceByKeyAndWindow。

上面,求出了每一小時窗口內的Url和Pv的對應關系,如果想求出相同的Url在上一個窗口的Pv和本次窗口的Pv的比值,那么這時侯updateStateByKey,mapWithState就粉墨登場了。由於updateStateByKey和mapWithState二者之間有10倍左右的性能差異。

這里,只涉及mapWithState。

mapWithState

import kafka.serializer.StringDecoder import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SQLContext import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming._ import org.apache.spark.{SparkContext, SparkConf} object ClickStream { def main (args: Array[String]){ // 屏蔽不必要的日志顯示在終端上
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //創建SparkConf對象,設置應用程序的名稱,在程序運行的監控界面可以看到名稱
    val conf = new SparkConf().setAppName("ClickStream").setMaster("local[*]") val sc = new SparkContext(conf) //此處設置Batch Interval是在Spark Streaming中生成基本Job的時間單位,窗口和滑動時間間隔一定是該Batch Interval的整數倍
    val ssc = new StreamingContext(sc, Seconds(args(0).toLong)) //由於用到了窗口函數,需要復用前面的RDD,必須checkpoint,注意復用的RDD之間是沒有任何關系的
    ssc.checkpoint(args(1)) val topics = Set("clickstream")    //所要獲取數據在kafka上的主題
    val brokers = yz4207.hadoop.data.sina.com.cn:19092,yz4203.hadoop.data.sina.com.cn:19092,yz4202.hadoop.data.sina.com.cn:19092,10.39.4.212:19092,10.39.4.201:19092" val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) //val offset = "largest" //values: smallest, largest ,控制讀取最新的數據,還是舊的數據, 默認值為largest //從Spark1.3開始,我們能夠使用如下方式高效地從kafka上獲取數據
    val kvsTemp = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val kvs = kvsTemp.map(line => line._2)                 //第一部分是null為key,第二部分才是所需數據,為string類型 //根據需求對流進來的數據進行清洗、轉換等處理
    val data = kvs.map(_.split("\\t")).filter(_(53) == "finance").map(_(27)).map(_.split("\\?")(0)).filter(! _.contains("iframe")).map((_, 1)) //滑動窗口長度為1小時,滑動間隔為10分鍾,這會得到過去1小時內,url和pv的對應關系 //val pvWindow = data.reduceByKeyAndWindow((v1: Int, v2: Int) => v1+v2, Minutes(60), Minutes(10)) //滑動窗口長度為1小時,滑動間隔為10分鍾,這同樣會得到過去1小時內,url和pv的對應關系,只不過這是加新減舊,第一個參數加上新的,第2個參數,減去上一個batch的。和上一個版本的reduceByKeyAndWindow每次都會重新算相比(疊加方式),
//這種方式(增量方式)更加高效優雅
val pvWindow = data.reduceByKeyAndWindow(_ + _, _ - _, Minutes(60), Minutes(10)) //key是K, value是新值,state是原始值(本batch之前的狀態值)。這里你需要把state更新為新值 val mappingFunc = (key: String, value: Option[Int], state: State[Int]) => { val currentPV = value.getOrElse(0) val output = (key, currentPV, state.getOption().getOrElse(0)) state.update(currentPV) output } //StateSpec只是一個包裹,實際操作仍然是定義的mappingFunc函數 val urlPvs = pvWindow.mapWithState(StateSpec.function(mappingFunc)) //url,當前batch的PV,上一個batch的PV urlPvs.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminat ssc.stop(true, true) //優雅地結束 } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM