摘要:
Sprak Streaming屬於Saprk API的擴展,支持實時數據流(live data streams)的可擴展,高吞吐(hight-throughput) 容錯(fault-tolerant)的流處理。可以接受來自KafKa,Flume,ZeroMQ Kinesis Twitter或TCP套接字的數據源,處理的結果數據可以存儲到文件系統 數據庫 現場dashboards等。
DStream編程模型
Dstream是Spark streaming中的高級抽象連續數據流,這個數據源可以從外部獲得(如KafKa Flume等),也可以通過輸入流獲得,還可以通過在其他DStream上進行高級操作創建,DStream是通過一組時間序列上連續的RDD表示的,所以一個DStream可以看作是一個RDDs的序列。
DStream操作
1.套接字流:通過監聽Socket端口來接收數據。
通過Scala編寫程序來產生一系列的字符作為輸入流:
GenerateChar:
object GenerateChar { def generateContext(index : Int) : String = { import scala.collection.mutable.ListBuffer val charList = ListBuffer[Char]() for(i <- 65 to 90) charList += i.toChar val charArray = charList.toArray charArray(index).toString } def index = { import java.util.Random val rdm = new Random rdm.nextInt(7) } def main(args: Array[String]) { val listener = new ServerSocket(9998) while(true){ val socket = listener.accept() new Thread(){ override def run() = { println("Got client connected from :"+ socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream,true) while(true){ Thread.sleep(500) val context = generateContext(index) //產生的字符是字母表的前七個隨機字母 println(context) out.write(context + '\n') out.flush() } socket.close() } }.start() } } }
ScoketStreaming:
object ScoketStreaming { def main(args: Array[String]) { //創建一個本地的StreamingContext,含2個工作線程 val conf = new SparkConf().setMaster("local[2]").setAppName("ScoketStreaming") val sc = new StreamingContext(conf,Seconds(10)) //每隔10秒統計一次字符總數 //創建珍一個DStream,連接master:9998 val lines = sc.socketTextStream("master",9998) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x , 1)).reduceByKey(_ + _) wordCounts.print() sc.start() //開始計算 sc.awaitTermination() //通過手動終止計算,否則一直運行下去 } }
運行結果:
GenerateChar產生的數據如下:
Got client connected from :/192.168.31.128
C
G
B
C
F
G
D
G
B
ScoketStreaming運行結果:
------------------------------------------- Time: 1459426750000 ms ------------------------------------------- (B,1) (G,1) (C,1) ------------------------------------------- Time: 1459426760000 ms ------------------------------------------- (B,5) (F,3) (D,4) (G,3) (C,3) (E,1)
注意:如果是在本地運行的,setMaster的參數必須為local[n],n >1,官網解釋:
When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either ofthese means that only one thread
will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single
thread will be used to run the receiver,leaving no thread for processing the received data. 當在本地運行Spark Streaming程序時,Master的URL不能設置為"local"或"local[1]",這兩種設置都意味着你將會只有一個線程來運行作業,如果你的Input DStream基於一個接收器
(如Kafka,Flume等),那么只有一個線程來接收數據,而沒有多余的線程來處理接收到的數據。
如果是在集群上運行,為Spark streaming應分配的核數應該在大於接收器的數據,否則同樣只接收了數據而沒有能力處理。
2.文件流:Spark Streaming通過監控文件系統的變化,若有新文件添加,則將它讀入並作為數據流
需要注意的是:
1.這些文件具有相同的格式
2.這些文件通過原子移動或重命名文件的方式在dataDirectory創建
3.一旦移動這些文件,就不能再進行修改,如果在文件中追加內容,這些追加的新數據也不會被讀取。
FileStreaming:
object FileStreaming { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("FileStreaming") val sc = new StreamingContext(conf,Seconds(5)) val lines = sc.textFileStream("/home/hadoop/wordCount") val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x , 1)).reduceByKey(_ + _) sc.start() sc.awaitTermination() } }
當你在文件目錄里添加文件時,Spark Streaming就會自動幫你讀入並計算 ,可以讀取本地目錄 HDFS和其他文件系統。
注意:文件流不需要運行接收器,所以不需要分配核數
3.RDD隊列流:使用streamingContext.queueStream(queueOfRDD)創建基於RDD隊列的DStream,用於調試Spark Streaming應用程序。
QueueStream:程序每隔1秒就創建一個RDD,Streaming每隔1秒就就對數據進行處理
object QueueStream { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("queueStream") //每1秒對數據進行處理 val ssc = new StreamingContext(conf,Seconds(1)) //創建一個能夠push到QueueInputDStream的RDDs隊列 val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]() //基於一個RDD隊列創建一個輸入源 val inputStream = ssc.queueStream(rddQueue) val mappedStream = inputStream.map(x => (x % 10,1)) val reduceStream = mappedStream.reduceByKey(_ + _) reduceStream.print ssc.start() for(i <- 1 to 30){ rddQueue += ssc.sparkContext.makeRDD(1 to 100, 2) //創建RDD,並分配兩個核數 Thread.sleep(1000) } ssc.stop() } }
輸出
------------------------------------------- Time: 1459595433000 ms //第1個輸出 ------------------------------------------- (4,10) (0,10) (6,10) (8,10) (2,10) (1,10) (3,10) (7,10) (9,10) (5,10) ............ ............ ------------------------------------------- Time: 1459595463000 ms //第30個輸出 ------------------------------------------- (4,10) (0,10) (6,10) (8,10) (2,10) (1,10) (3,10) (7,10) (9,10) (5,10)
4.帶狀態的處理staefull
updateStateByKey操作:使用updateStateByKey操作的地是為了保留key的狀態,並能持續的更新;使用此功能有如下兩個步驟:
1.定義狀態,這個狀態可以是任意的數據類型
2.定義狀態更新函數, 指定一個函數根據之前的狀態來確定如何更新狀態。
同樣以wordCount作為例子,不同的是每一次的輸出都會累計之前的wordCount
StateFull:
object StateFull { def main(args: Array[String]) { //定義狀態更新函數 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val conf = new SparkConf().setMaster("local[2]").setAppName("stateFull") val sc = new StreamingContext(conf, Seconds(5)) sc.checkpoint(".") //設置檢查點,存儲位置是當前目錄,檢查點具有容錯機制 val lines = sc.socketTextStream("master", 9999) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) stateDstream.print() sc.start() sc.awaitTermination() } }
先運行之前GenerateChar來產生字母,再運行StateFull,結果如下:
------------------------------------------- Time: 1459597690000 ms ------------------------------------------- (B,3) (F,1) (D,1) (G,1) (C,1) ------------------------------------------- Time: 1459597700000 ms //會累計之前的值 ------------------------------------------- (B,5) (F,3) (D,4) (G,4) (A,2) (E,5) (C,4)
Spark Straming最大的優點在於處理數據采用的是粗粒度的處理方式(一次處理一小批的數據),這種特性也更方便地實現容錯恢復機制,其DStream是在RDD上的高級
抽象,所以其極容易與RDD進行互操作。