spark streaming 實戰


最近在學習spark的相關知識, 重點在看spark streaming 和spark mllib相關的內容。

關於spark的配置: http://www.powerxing.com/spark-quick-start-guide/

這篇博客寫的很全面:http://www.liuhaihua.cn/archives/134765.html

spark streaming:

是spark系統中處理流數據的分布式流處理框架,能夠以最低500ms的時間間隔對流數據進行處理,延遲大概1s左右,

是一個准實時的流處理框架。 

spark streaming 可以和 spark SQL、MLlib 和GraphX相結合,共同完成基於實時處理的復雜系統。

spark steaming 的原理:

如上圖所示, spark streaming 將輸入的數據按時間分割為若干段,每一段對應以惡spark job, 最后將處理后的任務按返回,就像流水一樣。

DStram:

是 Spark Streaming 對內部持續的實時數據流的抽象描述,即我們處理的一個實時數據流,在 Spark Streaming 中對應於一個 DStream 實例,

通俗的講Dstream 一系列是RDD的集合。

 

spark Streaming 編程模型

DStream ( Discretized Stream )作為 Spark Streaming 的基礎抽象,它代表持續性的數據流。這些數據流既可以通過外部輸入源賴獲取,也可以通過現有的 Dstream 的 transformation 操作來獲得。在內部實現上, DStream 由一組時間序列上連續的 RDD 來表示。每個 RDD 都包含了自己特定時間間隔內的數據流, 如下圖所示:

而對DStream 的操作,也是映射到其內部的RDD上的,如下圖,通過轉換操作生存新的DStram:

 

 

spark Streaming 的三種運行場景:

1. 無狀態操作

2. 有狀態操作(updateStateByKey)

3. window操作

接下來分別說明。

 

 無狀態操作:每次計算的時間,僅僅計算當前時間切片的內容,如,每次只計算1s時間內產生的RDD

 有狀態操作:不斷的把當前的計算和歷史時間切片的RDD進行累計,如,計算某個單詞出現的次數,需要把當前的狀態與歷史的狀態相累加,隨着時間的流逝, 數據規模會越來越大

基於window的操作:針對特定的時間段,並以特定的時間間隔為單位的滑動操作,如每隔10秒,統計一下過去30秒過來的數據

如上圖,紅色的圈代表一個window,里面包含3個時間,並且window 每隔2個時間滑動一次,因此:

所以基於窗口的操作,需要指定2個參數:

  • window length - The duration of the window (3 in the figure)
  • slide interval - The interval at which the window-based operation is performed (2 in the figure). 

 

編程實戰:

官方提供的wordCount的實例:

package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel

/**
 * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
 *
 * Usage: NetworkWordCount <hostname> <port>
 * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
 */
object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

首先運行

nc -lk 9999

然后打開另一個窗口,在spark的目錄下 運行

 ./bin/run-example streaming.NetworkWordCount localhost 9999

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM