Spark Streaming--實戰篇


摘要:

     Sprak Streaming屬於Saprk API的擴展,支持實時數據流(live data streams)的可擴展,高吞吐(hight-throughput) 容錯(fault-tolerant)的流處理。可以接受來自KafKa,Flume,ZeroMQ Kinesis  Twitter或TCP套接字的數據源,處理的結果數據可以存儲到文件系統 數據庫 現場dashboards等。
 
DStream編程模型
Dstream是Spark streaming中的高級抽象連續數據流,這個數據源可以從外部獲得(如KafKa Flume等),也可以通過輸入流獲得,還可以通過在其他DStream上進行高級操作創建,DStream是通過一組時間序列上連續的RDD表示的,所以一個DStream可以看作是一個RDDs的序列。
 
DStream操作
1.套接字流:通過監聽Socket端口來接收數據。
通過Scala編寫程序來產生一系列的字符作為輸入流:
GenerateChar:
object GenerateChar {
  def generateContext(index : Int) : String = {
    import scala.collection.mutable.ListBuffer
    val charList = ListBuffer[Char]()
    for(i <- 65 to 90)
      charList += i.toChar
    val charArray = charList.toArray
    charArray(index).toString
  }
  def index = {
    import  java.util.Random
    val rdm = new Random
    rdm.nextInt(7) 
  }
  def main(args: Array[String]) {
    val listener = new ServerSocket(9998)
    while(true){
      val socket = listener.accept()
      new Thread(){
        override def run() = {
          println("Got client connected from :"+ socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream,true)
          while(true){
            Thread.sleep(500)
            val context = generateContext(index)  //產生的字符是字母表的前七個隨機字母
            println(context)
            out.write(context + '\n')
            out.flush()
          }
          socket.close()
        }
      }.start()
    }
  }
}
ScoketStreaming:
object ScoketStreaming {
  def main(args: Array[String]) {
    //創建一個本地的StreamingContext,含2個工作線程
    val conf = new SparkConf().setMaster("local[2]").setAppName("ScoketStreaming")
    val sc = new StreamingContext(conf,Seconds(10))   //每隔10秒統計一次字符總數
    //創建珍一個DStream,連接master:9998
    val lines = sc.socketTextStream("master",9998)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x , 1)).reduceByKey(_ + _)
    wordCounts.print()
    sc.start()         //開始計算
    sc.awaitTermination()   //通過手動終止計算,否則一直運行下去
  }
}
運行結果:
GenerateChar產生的數據如下:
Got client connected from :/192.168.31.128
C
G
B
C
F
G
D
G
B
ScoketStreaming運行結果:
-------------------------------------------
Time: 1459426750000 ms
-------------------------------------------
(B,1)
(G,1)
(C,1)
-------------------------------------------
Time: 1459426760000 ms
-------------------------------------------
(B,5)
(F,3)
(D,4)
(G,3)
(C,3)
(E,1)
注意:如果是在本地運行的,setMaster的參數必須為local[n],n >1,官網解釋:
   When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either ofthese means that only one thread 
will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single
thread will be used to run the receiver,leaving no thread for processing the received data. 當在本地運行Spark Streaming程序時,Master的URL不能設置為"local"或"local[1]",這兩種設置都意味着你將會只有一個線程來運行作業,如果你的Input DStream基於一個接收器
(如Kafka,Flume等),那么只有一個線程來接收數據,而沒有多余的線程來處理接收到的數據。
如果是在集群上運行,為Spark streaming應分配的核數應該在大於接收器的數據,否則同樣只接收了數據而沒有能力處理。
 
2.文件流:Spark Streaming通過監控文件系統的變化,若有新文件添加,則將它讀入並作為數據流
需要注意的是:
  1.這些文件具有相同的格式
  2.這些文件通過原子移動或重命名文件的方式在dataDirectory創建
  3.一旦移動這些文件,就不能再進行修改,如果在文件中追加內容,這些追加的新數據也不會被讀取。
FileStreaming:
object FileStreaming {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("FileStreaming")
    val sc = new StreamingContext(conf,Seconds(5))
    val lines = sc.textFileStream("/home/hadoop/wordCount")
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x , 1)).reduceByKey(_ + _)
    sc.start()
    sc.awaitTermination()
  }
}
當你在文件目錄里添加文件時,Spark  Streaming就會自動幫你讀入並計算 ,可以讀取本地目錄 HDFS和其他文件系統。
注意:文件流不需要運行接收器,所以不需要分配核數
 
3.RDD隊列流:使用streamingContext.queueStream(queueOfRDD)創建基於RDD隊列的DStream,用於調試Spark Streaming應用程序。
QueueStream:程序每隔1秒就創建一個RDD,Streaming每隔1秒就就對數據進行處理
object QueueStream {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[2]").setAppName("queueStream")
    //每1秒對數據進行處理
    val ssc = new StreamingContext(conf,Seconds(1))
    //創建一個能夠push到QueueInputDStream的RDDs隊列
    val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()
    //基於一個RDD隊列創建一個輸入源
    val inputStream = ssc.queueStream(rddQueue)
    val mappedStream = inputStream.map(x => (x % 10,1))
    val reduceStream = mappedStream.reduceByKey(_ + _)
    reduceStream.print
    ssc.start()
    for(i <- 1 to 30){
      rddQueue += ssc.sparkContext.makeRDD(1 to 100, 2)   //創建RDD,並分配兩個核數
      Thread.sleep(1000)                                  
    }
    ssc.stop()
  }
}
輸出
-------------------------------------------
Time: 1459595433000 ms //第1個輸出
-------------------------------------------
(4,10)
(0,10)
(6,10)
(8,10)
(2,10)
(1,10)
(3,10)
(7,10)
(9,10)
(5,10)
............
............
-------------------------------------------
Time: 1459595463000 ms //第30個輸出
-------------------------------------------
(4,10)
(0,10)
(6,10)
(8,10)
(2,10)
(1,10)
(3,10)
(7,10)
(9,10)
(5,10)
 
4.帶狀態的處理staefull
updateStateByKey操作:使用updateStateByKey操作的地是為了保留key的狀態,並能持續的更新;使用此功能有如下兩個步驟:
  1.定義狀態,這個狀態可以是任意的數據類型
  2.定義狀態更新函數, 指定一個函數根據之前的狀態來確定如何更新狀態。
 
同樣以wordCount作為例子,不同的是每一次的輸出都會累計之前的wordCount
StateFull:
object StateFull {
  def main(args: Array[String]) {
    //定義狀態更新函數
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
    val conf = new SparkConf().setMaster("local[2]").setAppName("stateFull")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint(".")    //設置檢查點,存儲位置是當前目錄,檢查點具有容錯機制
    val lines = sc.socketTextStream("master", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    sc.start()
    sc.awaitTermination()
  }
}
先運行之前GenerateChar來產生字母,再運行StateFull,結果如下:
-------------------------------------------
Time: 1459597690000 ms
-------------------------------------------
(B,3)
(F,1)
(D,1)
(G,1)
(C,1)
-------------------------------------------
Time: 1459597700000 ms //會累計之前的值
-------------------------------------------
(B,5)
(F,3)
(D,4)
(G,4)
(A,2)
(E,5)
(C,4)

 

Spark Straming最大的優點在於處理數據采用的是粗粒度的處理方式(一次處理一小批的數據),這種特性也更方便地實現容錯恢復機制,其DStream是在RDD上的高級

抽象,所以其極容易與RDD進行互操作。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM