用實例講解Spark Sreaming--轉


原文地址:http://www.infoq.com/cn/articles/spark-sreaming-practice

本篇文章用Spark Streaming +Hbase為列,Spark Streaming專為流式數據處理,對Spark核心API進行了相應的擴展。

什么是Spark Streaming?

首先,什么是流式處理呢?數據流是一個數據持續不斷到達的無邊界序列集。流式處理是把連續不斷的數據輸入分割成單元數據塊來處理。流式處理是一個低延遲的處理和流式數據分析。Spark Streaming對Spark核心API進行了相應的擴展,支持高吞吐、低延遲、可擴展的流式數據處理。實時數據處理應用的場景有下面幾個:

  • 網站監控和網絡監控;
  • 異常監測;
  • 網頁點擊;
  • 廣告數據;

物聯網(IOT)
此處輸入圖片的描述
圖1

Spark Streaming支持的數據源包括HDFS文件,TCP socket,Kafka,Flume,Twitter等,數據流可以通過Spark核心API、DataFrame SQL或者機器學習API處理,並可以持久化到本地文件、HDFS、數據庫或者其它任意支持Hadoop輸出格式的形式。

Spark Streaming如何工作?

Spark Streaming以X秒(batch size)為時間間隔把數據流分割成Dstream,組成一個RDD序列。你的Spark應用處理RDD,並把處理的結果批量返回。
此處輸入圖片的描述
圖2

Spark Streaming例子的架構圖

此處輸入圖片的描述
圖3

Spark Streaming例子代碼分下面幾部分:
- 讀取流式數據;
- 處理流式數據;
- 寫處理結果倒Hbase表。

Spark處理部分的代碼涉及到如下內容:

  • 讀取Hbase表的數據;
  • 按天計算數據統計;
  • 寫統計結果到Hbase表,列簇:stats。

數據集

數據集來自油泵信號數據,以CSV格式存儲在指定目錄下。Spark Streaming監控此目錄,CSV文件的格式如圖3。
此處輸入圖片的描述
圖4

采用Scala的case class來定義數據表結構,parseSensor函數解析逗號分隔的數據。

Hbase表結構

流式處理的Hbase表結構如下:

  • 油泵名字 + 日期 + 時間戳 組合成row key;
  • 列簇是由輸入數據列、報警數據列等組成,並設置過期時間。
  • 每天等統計數據表結構如下:
  • 油泵名和日期組成row key;

列簇為stats,包含列有最大值、最小值和平均值;
此處輸入圖片的描述
圖5

配置寫入Hbase表

Spark直接用TableOutputFormat類寫數據到Hbase里,跟在MapReduce中寫數據到Hbase表一樣,下面就直接用TableOutputFormat類了。

Spark Streaming代碼

Spark Streaming的基本步驟:

  • 初始化Spark StreamingContext對象;
  • 在DStream上進行transformation操作和輸出操作;
  • 開始接收數據並用streamingContext.start();
  • 等待處理停止,streamingContext.awaitTermination()。

初始化Spark StreamingContext對象

創建 StreamingContext對象,StreamingContext是Spark Streaming處理的入口,這里設置2秒的時間間隔。

val sparkConf = new SparkConf().setAppName("HBaseStream")
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))

接下來用StreamingContext的textFileStream(directory)創建輸入流跟蹤Hadoop文件系統的新文件,並處理此目錄下的所有文件,這里directory指文件目錄。

// create a DStream that represents streaming data from a directory source val linesDStream = ssc.textFileStream("/user/user01/stream")

linesDStream是數據流,每條記錄是按行記錄的text格式。
此處輸入圖片的描述
圖6

對DStream進行transformation操作和輸出操作

接下來進行解析,對linesDStream進行map操作,map操作是對RDD應用Sensor.parseSensor函數,返回Sensor的RDD。

// parse each line of data in linesDStream into sensor objects val sensorDStream = linesDStream.map(Sensor.parseSensor)

此處輸入圖片的描述
圖7

對DStream的每個RDD執行foreachRDD 方法,使用filter過濾Sensor中低psi值來創建報警,使用Hbase的Put對象轉換sensor和alter數據以便能寫入到Hbase。然后使用PairRDDFunctions的saveAsHadoopDataset方法將最終結果寫入到任何Hadoop兼容到存儲系統。

// for each RDD. performs function on each RDD in DStream sensorRDD.foreachRDD { rdd => // filter sensor data for low psi val alertRDD = rdd.filter(sensor => sensor.psi < 5.0) // convert sensor data to put object and write to HBase Table CF data rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig) // convert alert to put object write to HBase Table CF alerts rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig) }

sensorRDD經過Put對象轉換,然后寫入到Hbase。
此處輸入圖片的描述
圖8

開始接收數據

通過streamingContext.start()顯式的啟動數據接收,然后調用streamingContext.awaitTermination()來等待計算完成。

// Start the computation ssc.start() // Wait for the computation to terminate ssc.awaitTermination()

Spark讀寫Hbase

現在開始讀取Hbase的sensor表,計算每條的統計指標並把對應的數據寫入stats列簇。
此處輸入圖片的描述
圖9

下面的代碼讀取Hbase的sensor表psi列數據,用StatCounter計算統計數據,然后寫入stats列簇。

// configure HBase for reading val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName) // scan data column family psi column conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi") // Load an RDD of (row key, row Result) tuples from the table val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) // transform (row key, row Result) tuples into an RDD of Results val resultRDD = hBaseRDD.map(tuple => tuple._2) // transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key val keyValueRDD = resultRDD. map(result => (Bytes.toString(result.getRow()). split(" ")(0), Bytes.toDouble(result.value))) // group by rowkey , get statistics for column value val keyStatsRDD = keyValueRDD. groupByKey(). mapValues(list => StatCounter(list)) // convert rowkey, stats to put and write to hbase table stats column family keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)

下面的流程圖顯示newAPIHadoopRDD輸出,(row key,result)的鍵值對。PairRDDFunctions 的saveAsHadoopDataset方法把Put對象存入到Hbase。
此處輸入圖片的描述
圖10

運行Spark Streaming應用

運行Spark Streaming應用跟運行Spark應用類似,比較簡單,此處不贅述,參見Spark Streaming官方文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM