通過案例對 spark streaming 透徹理解三板斧之一: spark streaming 另類實驗


 

本期內容 :

  • spark streaming另類在線實驗
  • 瞬間理解spark streaming本質

 

一.  我們最開始將從Spark Streaming入手

為何從Spark Streaming切入Spark定制?Spark的子框架已有若干,為何選擇Spark Streaming?讓我們細細道來。

1.  Spark最開始只有Spark Core,沒有目前的這些子框架。這些子框架是構建於Spark Core之上的。沒有哪個子框架能擺脫Spark Core。我們通過對一個框架的徹底研究,肯定可以領會Spark力量的源泉,並精通所有問題的解決之道。

2.  我們再看看目前的這些子框架。Spark SQL有太多語法,研究這些會太浪費精力。SparkR還沒完善。Spark GraphX已無太多可改進之處,圖計算相關的數學知識也不是目前重點。Spark MLlib中的機器學習也有太多算法是與數學相關,也不是做改進的好的選擇 。所以我們選擇了Spark Streaming。

二 . 對Spark Streaming的理解

1. Spark Streaming是流式計算,當今時代是一個流處理時代,一切數據如果不是流式處理, 或者說和流式處理不相關的話,都是無效的數據。

2. 流式處理才是我們對大數據的初步印象,而不是批處理和數據挖掘,當然Spark強悍的地方在於,他的流式處理可以在線的直接使用機器學習、圖計算、SparkSQL、Spark R的成果。

3. 整個Spark的程序,基於Spark Streaming的最容易出問題,也是最受關注的地方,也是最需要人才的地方。

4. Spark Streaming和其他子框架的不同之處,Spark Streaming很像基於Spark Core之上的應用程序。

5. 尋龍點穴,Spark就是龍脈,Spark Streaming就是穴位

 

2015年是流式處理的一年。大家考慮用Spark,主要也是因為Spark Streaming。這是一個流處理的時代,一切數據如果與流式處理不相關的話,都是無效的數據。Spark之所以強悍的一個重要原因在於,它的流式處理可以在線使用圖計算、機器學習或者SparkR的成果,這得益於Spark一體化、多元化的基礎架構設計。也就是在Spark Streaming中可以調用其它子框架,無需任何設置。這是Spark的無可匹敵之處,也是Spark Streaming必將一統天下的根源。但Spark的應用中,Spark Streaming也是最容易出問題的。

Spark Streaming與其它子框架不同之處在於,它更像是Spark Core之上的一個應用程序。所以如果要做Spark的定制開發,Spark Streaming則提供了最好的參考。你想掌握Spark Streaming,但你不去精通Spark Core的話,那是不可能的。所以我們選擇Spark Streaming來提升自己,是找到了關鍵點。

 

三 .Spark Streaming另類在線實驗

       我們在研究Spark Streaming的過程中,會有困惑的事情:如何清晰的看到數據的流入、被處理的過程?

       使用一個小技巧,通過調節放大Batch Interval的方式,來降低批處理次數,以方便看清楚各個環節。

       我們從已寫過的廣告點擊的在線黑名單過濾的Spark Streaming應用程序入手。

  

  1. 案例代碼 :

import org.apache.spark.SparkConf  
import org.apache.spark.streaming.StreamingContext  
import org.apache.spark.streaming.Seconds  

/**
* Created by hadoop on 2016/4/18.
* 背景描述 在廣告點擊計費系統中 我們在線過濾掉 黑名單的點擊 進而保護廣告商的利益
* 只有效的廣告點擊計費
* 
*/
object OnlineBlackListFilter { def main(args: Array[String]){ /** * 第1步:創建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息, * 例如說通過setMaster來設置程序要鏈接的Spark集群的Master的URL,如果設置 * 為local,則代表Spark程序在本地運行,特別適合於機器配置條件非常差(例如 * 只有1G的內存)的初學者。 */ // 創建SparkConf對象 val conf = new SparkConf() // 設置應用程序的名稱,在程序運行的監控界面可以看到名稱 conf.setAppName("OnlineBlackListFilter") // 此時,程序在Spark集群 conf.setMaster("spark://Master:7077") val ssc = new StreamingContext(conf, Seconds(30)) /** * 黑名單數據准備,實際上黑名單一般都是動態的,例如在Redis或者數據庫中, * 黑名單的生成往往有復雜的業務邏輯,具體情況算法不同, * 但是在Spark Streaming進行處理的時候每次都能夠訪問完整的信息。 */ val blackList = Array(("Spy", true),("Cheater", true)) val blackListRDD = ssc.sparkContext.parallelize(blackList, 8) val adsClickStream = ssc.socketTextStream("Master", 9999) /** * 此處模擬的廣告點擊的每條數據的格式為:time、name * 此處map操作的結果是name、(time,name)的格式 */ val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) } adsClickStreamFormatted.transform(userClickRDD => { // 通過leftOuterJoin操作既保留了左側用戶廣告點擊內容的RDD的所有內容, // 又獲得了相應點擊內容是否在黑名單中 val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD) /** * 進行filter過濾的時候,其輸入元素是一個Tuple:(name,((time,name), boolean)) * 其中第一個元素是黑名單的名稱,第二元素的第二個元素是進行leftOuterJoin的時候是否存在的值。 * 如果存在的話,表面當前廣告點擊是黑名單,需要過濾掉,否則的話是有效點擊內容; */ val validClicked = joinedBlackListRDD.filter(joinedItem => { if(joinedItem._2._2.getOrElse(false)) { false } else { true } }) validClicked.map(validClick => {validClick._2._1}) }).print /** * 計算后的有效數據一般都會寫入Kafka中,下游的計費系統會從kafka中pull到有效數據進行計費 */ ssc.start() ssc.awaitTermination() } }

   

  2.  把程序的Batch Interval設置成300秒:

   1 val ssc = new StreamingContext(conf, Seconds(300)) 

  3.  重新生成一下jar包 。

  4.  啟動Spark的History Server,打開數據發送的端口 : nc -lk 9999

  5.  用spark-submit運行前面生成的jar包 。

  6.  在數據發送端口輸入若干數據,比如:

    1375864674543 Tom
    1375864674553 Spy
    1375864674571 Andy
    1375864688436 Cheater
    1375864784240 Kelvin
    1375864853892 Steven
    1375864979347 John

  

  7. 打開瀏覽器,看History Server的日志信息:

  

   點擊最新的應用,看我們目前運行的應用程序中有些什么Job:

   總共竟然有5個Job。這完全不是我們此前做Spark SQL之類的應用程序時看到的樣子。

   我們接下來看一看這些Job的內容,主要揭示一些現象,不會做完全深入的剖析,只是為了先讓大家進行一些思考。

        Job 0:此Job不體現我們的業務邏輯代碼。這個Job是出於對后面計算的負載均衡的考慮。

   

   Job 0包含有Stage 0、Stage 1。隨便看一個Stage,比如Stage 1。看看其中的Aggregated Metrics by Executor部分:

    發現此Stage在所有Executor上都存在。 

   Job 1:運行時間比較長,耗時1.5分鍾。

   點擊Stage 2的鏈接,進去看看Aggregated Metrics By Executor部分:

   

 

   可以知道,Stage 2只在Worker4上的一個Executor執行,而且執行了1.5分鍾。

   是否會覺得奇怪:從業務處理的角度看,我們發送的那么一點數據,沒有必要去啟動一個運行1.5分鍾的任務吧。那這個任務是做什么呢?

    從DAG Visualization部分,就知道此Job實際就是啟動了一個接收數據的Receiver:

   原來Receiver是通過一個Job來啟動的。那肯定有一個Action來觸發它。

   看看Tasks部分: 

 

   只有一個Worker運行此Job。是用於接收數據。

   Locality Level是PROCESS_LOCAL,原來是內存節點。所以,默認情況下,數據接收不會使用磁盤,而是直接使用內存中的數據。

   看來,Spark Streaming應用程序啟動后,自己會啟動一些Job。默認啟動了一個Job來接收數據,為后續處理做准備。

   重要啟示:一個Spark應用程序中可以啟動很多Job,而這些不同的Job之間可以相互配合。這一認識為我們寫復雜Spark程序奠定了良好的基礎。

       

   Job 2:看Details可以發現有我們程序的主要業務邏輯,體現在Stag 3、Stag4、Stag 5中。

   

 

  我們看Stag3、Stage4的詳情,可以知道這2個Stage都是用4個Executor執行的。所有數據處理是在4台機器上進行的。

   Stag 5只在Worker4上。這是因為這個Stage有Shuffle操作。

   Job3:有Stage 6、Stage 7、Stage 8。其中Stage 6、Stage 7被跳過。

    

   看看Stage 8的Aggregated Metrics by Executor部分。可以看到,數據處理是在4台機器上進行的:

   Job4:也體現了我們應用程序中的業務邏輯 。有Stage 9、Stage 10、Stage 11。其中Stage 9、Stage 10被跳過。

    

 

   看看Stage 11的詳情。可以看到,數據處理是在Worker4之外的其它3台機器上進行的:

    

   綜合以上的現象可以知道,Spark Streaming的一個應用中,運行了這么多Job,遠不是我們從網絡博客或者書籍上看的那么簡單。

   我們有必要通過這些現象,反過來回溯去尋根問源。不過這次暫不做深入分析。

   我們的神奇之旅才剛剛開始。

 

四.  瞬間理解Spark Streaming本質

  

  以上的連續4個圖,分別對應以下4個段落的描述:

    Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各種來源的實時輸入數據,進行處理后,處理結果保存在HDFS、Databases等各種地方。

    Spark Streaming接收這些實時輸入數據流,會將它們按批次划分,然后交給Spark引擎處理,生成按照批次划分的結果流。

    Spark Streaming提供了表示連續數據流的、高度抽象的被稱為離散流的DStream。DStream本質上表示RDD的序列。任何對DStream的操作都會轉變為對底層RDD的操作。

    Spark Streaming使用數據源產生的數據流創建DStream,也可以在已有的DStream上使用一些操作來創建新的DStream。

 

  在我們前面的實驗中,每300秒會產生一批數據,基於這批數據會生成RDD,進而觸發Job,執行處理。

  DStream是一個沒有邊界的集合,沒有大小的限制。

  DStream代表了時空的概念。隨着時間的推移,里面不斷產生RDD。

  鎖定到時間段后,就是空間的操作。也就是對本時間段的對應批次的數據的處理。

 

  下面用實例來講解數據處理過程。

  數據處理會有若干個對DStream的操作,這些操作之間的依賴關系,構成了DStreamGraph。如以下圖例所示:   

  上圖中每個foreach都會觸發一個作業,就會順着依賴從后往前回溯,形成DAG,如下圖所示:

  

  空間維度確定之后,隨着時間不斷推進,會不斷實例化RDD Graph,然后觸發Job去執行處理。

      現在再去讀官方的Spark Streaming的文檔,就好理解多了。

  

    看來我們的學習,將從Spark Streaming的現象開始,深入到Spark Core和Spark Streaming的本質。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM