一、SparkCore、SparkSQL和SparkStreaming的類似之處
二、SparkStreaming的運行流程
2.1 圖解說明
2.2 文字解說
1、我們在集群中的其中一台機器上提交我們的Application Jar,然后就會產生一個Application,開啟一個Driver,然后初始化SparkStreaming的程序入口StreamingContext;
2、Master會為這個Application的運行分配資源,在集群中的一台或者多台Worker上面開啟Excuter,executer會向Driver注冊;
3、Driver服務器會發送多個receiver給開啟的excuter,(receiver是一個接收器,是用來接收消息的,在excuter里面運行的時候,其實就相當於一個task任務)
4、receiver接收到數據后,每隔200ms就生成一個block塊,就是一個rdd的分區,然后這些block塊就存儲在executer里面,block塊的存儲級別是Memory_And_Disk_2;
5、receiver產生了這些block塊后會把這些block塊的信息發送給StreamingContext;
6、StreamingContext接收到這些數據后,會根據一定的規則將這些產生的block塊定義成一個rdd;
三、SparkStreaming的3個組成部分
四、 離散流(DStream)
五、小栗子
5.1 簡單的單詞計數
Scala代碼
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object NetWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sparkContext = new SparkContext(conf) val sc = new StreamingContext(sparkContext,Seconds(2)) /** * 數據的輸入 * */ val inDStream: ReceiverInputDStream[String] = sc.socketTextStream("bigdata",9999) inDStream.print() /** * 數據的處理 * */ val resultDStream: DStream[(String, Int)] = inDStream.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_) /** * 數據的輸出 * */ resultDStream.print() /** *啟動應用程序 * */ sc.start() sc.awaitTermination() sc.stop() } }
在Linux上執行以下命令
運行結果
5.2 監控HDFS上的一個目錄
HDFS上的目錄需要先創建
Scala代碼
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} object HDFSWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName) val sc = new StreamingContext(conf,Seconds(2)) val inDStream: DStream[String] = sc.textFileStream("hdfs://hadoop1:9000/streaming") val resultDStream: DStream[(String, Int)] = inDStream.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_) resultDStream.print() sc.start() sc.awaitTermination() sc.stop() } }
Linux上的命令
student.txt
95002,劉晨,女,19,IS 95017,王風娟,女,18,IS 95018,王一,女,19,IS 95013,馮偉,男,21,CS 95014,王小麗,女,19,CS 95019,邢小麗,女,19,IS
運行結果,默認展示的10條
5.3 第二次運行的時候更新原先的結果
Scala代碼
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} object UpdateWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") System.setProperty("HADOOP_USER_NAME","hadoop") val sparkContext = new SparkContext(conf) val sc = new StreamingContext(sparkContext,Seconds(2)) sc.checkpoint("hdfs://hadoop1:9000/streaming") val inDStream: ReceiverInputDStream[String] = sc.socketTextStream("hadoop1",9999) val resultDStream: DStream[(String, Int)] = inDStream.flatMap(_.split(",")) .map((_, 1)) .updateStateByKey((values: Seq[Int], state: Option[Int]) => { val currentCount: Int = values.sum val lastCount: Int = state.getOrElse(0) Some(currentCount + lastCount) }) resultDStream.print() sc.start() sc.awaitTermination() sc.stop() } }
Linux運行命令
運行結果
5.4 DriverHA
5.3的代碼一直運行,結果可以一直累加,但是代碼一旦停止運行,再次運行時,結果會不會接着上一次進行計算,上一次的計算結果丟失了,主要原因上每次程序運行都會初始化一個程序入口,而2次運行的程序入口不是同一個入口,所以會導致第一次計算的結果丟失,第一次的運算結果狀態保存在Driver里面,所以我們如果想用上一次的計算結果,我們需要將上一次的Driver里面的運行結果狀態取出來,而5.3里面的代碼有一個checkpoint方法,它會把上一次Driver里面的運算結果狀態保存在checkpoint的目錄里面,我們在第二次啟動程序時,從checkpoint里面取出上一次的運行結果狀態,把這次的Driver狀態恢復成和上一次Driver一樣的狀態