原文地址:http://www.infoq.com/cn/articles/spark-sreaming-practice
本篇文章用Spark Streaming +Hbase為列,Spark Streaming專為流式數據處理,對Spark核心API進行了相應的擴展。
什么是Spark Streaming?
首先,什么是流式處理呢?數據流是一個數據持續不斷到達的無邊界序列集。流式處理是把連續不斷的數據輸入分割成單元數據塊來處理。流式處理是一個低延遲的處理和流式數據分析。Spark Streaming對Spark核心API進行了相應的擴展,支持高吞吐、低延遲、可擴展的流式數據處理。實時數據處理應用的場景有下面幾個:
- 網站監控和網絡監控;
- 異常監測;
- 網頁點擊;
- 廣告數據;
物聯網(IOT)
圖1
Spark Streaming支持的數據源包括HDFS文件,TCP socket,Kafka,Flume,Twitter等,數據流可以通過Spark核心API、DataFrame SQL或者機器學習API處理,並可以持久化到本地文件、HDFS、數據庫或者其它任意支持Hadoop輸出格式的形式。
Spark Streaming如何工作?
Spark Streaming以X秒(batch size)為時間間隔把數據流分割成Dstream,組成一個RDD序列。你的Spark應用處理RDD,並把處理的結果批量返回。
圖2
Spark Streaming例子的架構圖
圖3
Spark Streaming例子代碼分下面幾部分:
- 讀取流式數據;
- 處理流式數據;
- 寫處理結果倒Hbase表。
Spark處理部分的代碼涉及到如下內容:
- 讀取Hbase表的數據;
- 按天計算數據統計;
- 寫統計結果到Hbase表,列簇:stats。
數據集
數據集來自油泵信號數據,以CSV格式存儲在指定目錄下。Spark Streaming監控此目錄,CSV文件的格式如圖3。
圖4
采用Scala的case class來定義數據表結構,parseSensor函數解析逗號分隔的數據。
Hbase表結構
流式處理的Hbase表結構如下:
- 油泵名字 + 日期 + 時間戳 組合成row key;
- 列簇是由輸入數據列、報警數據列等組成,並設置過期時間。
- 每天等統計數據表結構如下:
- 油泵名和日期組成row key;
列簇為stats,包含列有最大值、最小值和平均值;
圖5
Spark直接用TableOutputFormat類寫數據到Hbase里,跟在MapReduce中寫數據到Hbase表一樣,下面就直接用TableOutputFormat類了。
Spark Streaming的基本步驟:
- 初始化Spark StreamingContext對象;
- 在DStream上進行transformation操作和輸出操作;
- 開始接收數據並用streamingContext.start();
- 等待處理停止,streamingContext.awaitTermination()。
初始化Spark StreamingContext對象
創建 StreamingContext對象,StreamingContext是Spark Streaming處理的入口,這里設置2秒的時間間隔。
val sparkConf = new SparkConf().setAppName("HBaseStream")
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))
接下來用StreamingContext的textFileStream(directory)創建輸入流跟蹤Hadoop文件系統的新文件,並處理此目錄下的所有文件,這里directory指文件目錄。
// create a DStream that represents streaming data from a directory source val linesDStream = ssc.textFileStream("/user/user01/stream")
linesDStream是數據流,每條記錄是按行記錄的text格式。
圖6
對DStream進行transformation操作和輸出操作
接下來進行解析,對linesDStream進行map操作,map操作是對RDD應用Sensor.parseSensor函數,返回Sensor的RDD。
// parse each line of data in linesDStream into sensor objects val sensorDStream = linesDStream.map(Sensor.parseSensor)
圖7
對DStream的每個RDD執行foreachRDD 方法,使用filter過濾Sensor中低psi值來創建報警,使用Hbase的Put對象轉換sensor和alter數據以便能寫入到Hbase。然后使用PairRDDFunctions的saveAsHadoopDataset方法將最終結果寫入到任何Hadoop兼容到存儲系統。
// for each RDD. performs function on each RDD in DStream sensorRDD.foreachRDD { rdd => // filter sensor data for low psi val alertRDD = rdd.filter(sensor => sensor.psi < 5.0) // convert sensor data to put object and write to HBase Table CF data rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig) // convert alert to put object write to HBase Table CF alerts rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig) }
sensorRDD經過Put對象轉換,然后寫入到Hbase。
圖8
開始接收數據
通過streamingContext.start()顯式的啟動數據接收,然后調用streamingContext.awaitTermination()來等待計算完成。
// Start the computation ssc.start() // Wait for the computation to terminate ssc.awaitTermination()
Spark讀寫Hbase
現在開始讀取Hbase的sensor表,計算每條的統計指標並把對應的數據寫入stats列簇。
圖9
下面的代碼讀取Hbase的sensor表psi列數據,用StatCounter計算統計數據,然后寫入stats列簇。
// configure HBase for reading val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName) // scan data column family psi column conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi") // Load an RDD of (row key, row Result) tuples from the table val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) // transform (row key, row Result) tuples into an RDD of Results val resultRDD = hBaseRDD.map(tuple => tuple._2) // transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key val keyValueRDD = resultRDD. map(result => (Bytes.toString(result.getRow()). split(" ")(0), Bytes.toDouble(result.value))) // group by rowkey , get statistics for column value val keyStatsRDD = keyValueRDD. groupByKey(). mapValues(list => StatCounter(list)) // convert rowkey, stats to put and write to hbase table stats column family keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)
下面的流程圖顯示newAPIHadoopRDD輸出,(row key,result)的鍵值對。PairRDDFunctions 的saveAsHadoopDataset方法把Put對象存入到Hbase。
圖10
運行Spark Streaming應用
運行Spark Streaming應用跟運行Spark應用類似,比較簡單,此處不贅述,參見Spark Streaming官方文檔。