大數據開發實戰:Spark Streaming流計算開發


  1、背景介紹

      Storm以及離線數據平台的MapReduce和Hive構成了Hadoop生態對實時和離線數據處理的一套完整處理解決方案。除了此套解決方案之外,還有一種非常流行的而且完整的離線和

    實時數據處理方案。這種方案就是Spark。Spark本質上是對Hadoop特別是MapReduce的補充、優化和完善,尤其是數據處理速度、易用性、迭代計算和復雜數據分析等方面。

      Spark Streaming 作為Spark整體解決方案中實時數據處理部分,本質上仍然是基於Spark的彈性分布式數據集(Resilient Distributed Datasets :RDD)概念。Spark Streaming將源頭

    數據划分為很小的批,並以類似於離線批的方式來處理這部分微批數據。

      相對於Storm這種原生的實時處理框架,Spark Streaming基於微批的的方案帶來了吞吐量的提升,但是也導致了數據處理延遲的增加---基於Spark Streaming實時數據處理方案的數據

    延遲通常在秒級甚至分鍾級。

  2、Spark生態和核心概念

    2.1、Spark概覽

      Spark誕生於美國伯克利大學的AMPLab,它最初屬於伯克利大學的研究性項目,與2010年正式開源,於2013年成為Apache基金項目,冰雨2014年成為Apache基金的頂級項目。

      Spark用了不到5年的時間就成了Apache的頂級項目,目前已被國內外的眾多互聯網公司使用,包括Amazon、EBay、淘寶、騰訊等。

      Spark的流行和它解決了Hadoop的很多不足密不可分。

      傳統Hadoop基於MapReduce的方案適用於大多數的離線批處理場景,但是對於實時查詢、迭代計算等場景非常不適合,這是有其內在局限決定的。

      1、MapReduce只提供Map和Reduce兩個操作,抽象程度低,但是復雜的計算通常需要很多操作,而且操作之間有復雜的依賴關系。

      2、MapReduce的中間處理結果是放在HDFS文件系統中的,每次的落地和讀取都消耗大量的時間和資源。

      3、當然,MapReduce也不支持高級數據處理API、DAG(有向五環圖)計算、迭代計算等。

      Spark則較好地解決了上述這些問題。

      1、Spark通過引入彈性分布式數據集(Resilient Distributed Datasets:RDD)以及RDD豐富的動作操作API,非常好地支持了DGA的計算和迭代計算。

      2、Spark通過內存計算和緩存數據非常好地支持了迭代計算和DAG計算的數據共享、減少了數據讀取的IO開銷、大大提高了數據處理速度。

      3、Spark為批處理(Spark Core)、流式處理(Spark Streaming)、交互分析(Spark SQL)、機器學習(MLLib)和圖計算(GraphX)提供了一個同一的平台和API,非常便於使用。

      4、Spark非常容易使用、Spark支持java、Python和Scala的API,還支持超過80種高級算法,使得用戶可以快速構建不同的應用。Spark支持交互式的Python和Scala的shell,這意味着

          可以非常方便地在這些shell中使用Spark集群來驗證解決問題的方法,而不是像以前一樣,需要打包、上傳集群、驗證等。這對於原型開發尤其重要。

      5、Spark可以非常方便地與其他開源產品進行融合:比如,Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和調度器,並且可以處理所有Hadoop支持的數據,包括HDFS、

        HBase和Cassandra等。Spark也可以不依賴於第三方的資源管理和調度器,它實現了Standalone作為其內置的資源管理和調度框架,這樣進一步降低了Spark的使用門檻。

      6、External Data Source多數據源支持:Spark可以獨立運行,除了可以運行在當下的Yarn集群管理之外,它還可以讀取已有的Hadoop數據。它可以運行多種數據源,比如Parquet、Hive、

        HBase、HDFS等。

   2.2、Spark核心概念

        RDD是Spark中最為核心和重要的概念。RDD,全稱為 Resilient Distributed Dataset,在Spark官方文檔中被稱為“一個可並行操作的有容錯機制的數據集合”。實際上RDD就是

      一個數據集,而且是分布式的。同時Spark還對這個分布式數據集提供了豐富的數據操作和容錯性。

      1、RDD創建

        Spark中創建RDD最直接的方法是調用SparkContext(SparkContext是Spark集群環境的訪問入口,Spark Streaming也有自己對應的對象StreamContext)的parallelize方法。

        List<Integer> data =  Arrays.asList(1,2,3,4,5);

        HavaRDD<Integer> distData = sc.parallelize(data);

        上述代碼會將數據集合 (data)轉換為這個分布式數據集(distData),之后就可以對此RDD執行各種轉換等。比如調用distData.reduce((a,b) => a+b),將這個數組中的元素項加,

      此外,還可以通過設置parallelize的第二個參數手動設置生成RDD的分區數:sc.parallelize(data,10),如果不設定的話,Spark會自動設定。

        但在實際的項目中,RDD一般是從源頭數據創建的。Spark支持從任何一個Hadoop支持的存儲數據創建RDD,包括本地文件系統、HDFS、Cassandna、HBase和Amazon S3等。

      另外,Spark也支持從文本文件,SequenceFiles和其它Hadoop InputFormat的格式文件中創建RDD。創建的方法也很簡單,只需要指定源頭文件並調用對應的方法即可:

        JavaRDD<String> distFile = sc.textFile("data.txt");

        Spark 中轉換SequenceFile的SparkContext方法是sequenceFile,轉換Hadoop InputFormats的SparkContext方法是HadoopRDD。

      2、RDD操作

        RDD操作分為轉換(transformation)和行動(action),transformation是根據原有的RDD創建一個新的RDD,action則吧RDD操作后的結果返回給driver。例如map 是一個轉換,

      它把數據集中的每個元素經過一個方法處理的結果返回一個新的RDD,reduce是一個action,它收集RDD的所有數據經過一些方法的處理,最后把結果返回給driver。

        Spark對transformation的抽象可以大大提高性能,這是因為在Spark中,所有transformation操作都是lazy模式,即Spark不會立即計算結果,而只是簡單地記住所有對數據集的

      轉換操作邏。這些轉換只有遇到action操作的時候才會開始計算。這樣的設計使得Spark更加高效,例如可以通過map創建一個新數據集在reduce中使用,並僅僅返回reduce的

      結果給driver,而不是整個大大的map過的數據集。

      3、RDD持久化

        Spark最重要的一個功能是它可以通過各種操作持久化(或緩存)一個集合到內存中。當持久化一個RDD的時候,每一個節點都將參與計算的所有分區數據存儲到內存中,

      並且這些數據可以被這個集合(以及這個集合衍生的其他集合)的動作重復利用。這個能力使后續的動作速度更快(通常快10倍以上)。對應迭代算法和快速的交互應用來說,

      緩存是一個關鍵的工具。

        可以通過 persist()或者cache()方法持久化一個RDD。先在action中計算得到RDD,然后將其保存在每個節點的內存中。Spark的緩存是一個容錯的技術,也就是說,如果RDD的

      任何一個分區丟失,它可以通過原有的轉換操作自動重復計算並且創建出這個分區。

        此外,還可以利用不同的存儲級別存儲每一個被持久化的RDD,。例如,它允許持久化集合到磁盤上、將集合作為序列化的Java對象持久化到內存中、在節點間復制集合或存儲集合

      到Tachyon中。可以通過傳遞一個StorageLevel對象給persist()方式設置這些存儲級別。cache()使用了默認的存儲級別-----StorageLevel.MEMORY_ONLY。

      4、Spark生態圈

        Spark建立在統一抽象的RDD之上,使得它可以以基本一致的方式應對不同的大數據處理場景,包括批處理,流處理、SQL、Machine Learning以及GraphX等。這就是Spark設計的“

      通用的編程抽象”( Unified Programming Abstraction),也正是Spark獨特的地方。

        Spark生態圈包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等組件,其中Spark Core提供內存計算框架、SparkStreaming提供實時處理應用、Spark SQL提供

      即席查詢,再加上MLlib的機器學習和GraphX的圖處理,它們能無縫集成並提供Spark一站式的大數據解決平台和生態圈。

      

      Spark Core:Spark Core實現了Spark的基本功能,包括任務調度、內存管理、錯誤恢復、與存儲系統交互等模塊。Spark Core還包括了RDD的API定義,並提供了創建和操作RDD的

        豐富API。Spark Core是Spark其它組件的基礎和根本。

      Spark Streaming:他是Spark提供的對實時數據進行流計算的組件,提供了用來操作數據流的API,並且與Spark Core中的RDD API高度對應。Spark Streaming支持與Spark Core

        同級別的容錯性、吞吐量和伸縮性。

      Spark SQL:它是Spark用來操作結構化數據的程序包,通過Spark SQL,可以使用SQL或類SQL語言來查詢數據;同時Spark SQL支持多種數據源,比如Hive表、Parquet以及

        JSON等,除了為Spark提供一個SQL接口,Spark SQL還支持開發者將SQL和傳統的RDD編程的數據操作方式向結合,不論是使用Python、Java還是Scala,開發者都可以在

        單個應用中同時使用SQL和復雜的數據分析。

      MLLib:Spark提供了常見的機器學習功能的程序庫,叫做MLlib,MLlib提供了多種機器學習算法,包括分類、回歸、聚類、協同過濾等,還提供了模型評估、數據導入等額外的

        支持功能。此外,MLLib還提供了一些更底層的機器學習原語,包括一個通用的梯度下降優化算法,所有這些方法都被設計為可以在集群上輕松伸縮的架構。

      GraphX:GraphX是用來操作圖(如社交網絡的朋友圈)的程序庫,可以進行並行的圖計算。與Spark Streaming和Spark SQL類似,GraphX也擴展了Spark的RDD API,

        能用來創建一個頂點和邊都包含任意屬性的有向圖。GraphX還支持針對圖的各種操作(如進行圖分割的subgraph和操作所有頂點的mapVertices),以及一些常用的圖算法

       (如PageRank和三角計算)。

   3、Spark生態的流計算技術:Spark Streaming

        Spark Streaming作為Spark的核心組件之一,通Storm一樣,主要對數據進行實時的流處理,但是不同於Apache Storm(這里指的是原生Storm,非Trident),在Spark Streaming

      中數據處理的單位是是一批而不是一條,Spark會等采集的源頭數據累積到設置的間隔條件后,對數據進行統一的微批處理。這個間隔是Spark Streaming中的核心概念和關鍵參數,

      直接決定了Spark Streaming作業的數據處理延遲,當然也決定着數據處理的吞吐量和性能。

        相對於Storm的毫秒級延遲來說,Spark Streaming的延遲最多只能到幾百毫秒,一般是秒級甚至分鍾級,因此對於實時數據處理延遲要非常高的場合,Spark Streaming並不合適。

        另外,Spark Streaming底層依賴於Spark Core 的RDD實現,即它和Spark框架整體是綁定在一起的,這是優點也是缺點。

        對於已經采用Spark 作為大數據處理框架,同時對數據延遲性要求不是很高的場合,Spark Streaming非常適合作為事實流處理的工具和方案,原因如下:

        1、Spark Streaming內部的實現和調度方式高度依賴於Spark的DAG調度器和RDD,Spark Streaming的離散流(DStream)本質上是RDD在流式數據上的抽象,因此熟悉Spark和

        和RDD概念的用戶非常容易理解Spark Streaming已經其DSream。

        2、Spark上各個組件編程模型都是類似的,所以如果熟悉Spark的API,那么對Spark Streaming的API也非常容易上手和掌握。

        但是,如果已經采用了其他諸如Hadoop和Storm的數據處理方案,那么如果使用Spark Streaming,則面臨着Spark以及Spark Streaming的概念及原理的學習成本。

        總體來說,Spark Streaming作為Spark核心API的一個擴展,它對實時流式數據的處理具有可擴展性、高吞吐量、和可以容錯性等特點。

        同其他流處理框架一樣,Spark Streaming從Kafka、Flume、Twitter、ZeroMQ、Kinesis等源頭獲取數據,並map、reduce、join、window等組成的復雜算法計算出期望的結果,處理

      后的結果數據可被推送到文件系統,數據庫、實時儀表盤中,當然,也可以將處理后的數據應用到Spark的機器學習算法、圖處理算法中。整個的數據處理流程如下:

      

 

    3.1、Spark Streaming基本原理

      Spark Streaming 中基本的抽象是離散流(即DStream).DStream代表一個連續的數據流。在Spark Streaming內部中,DStream實際上是由一系列連續RDD組成。每個RDD包含確定

    時間間隔內的數據,這些離散的RDD連在一起,共同組成了對應的DStream。

    

      實際上任何,任何對DStream的操作都轉換成了對DStream隱含的一系列對應RDD的操作,如上圖中對lines DStream是的flatMap操作,實際上應用於lines對應每個RDD的操作,並生成了

    對應的work DStream的RDD。

      也就是上面所說的,Spark Streaming底層依賴於Spark Core的RDD實現。從本質上來說,Spark Streaming只不過是將流式的數據流根據設定的間隔分成了一系列的RDD,然后在每個RDD上

    應用相應的各種操作和協作,所以Spark Streaming底層的運行引擎實際上是Spark Core。

   3.2、Spark Streaming核心API

      SparkStreaming完整的API包括StreamingContext、DStream輸入、DStream上的各種操作和動作、DStream輸出等。

      1、StreamingContext

        為了初始化Spark Streaming程序,必須創建一個StreamingContext對象,該對象是Spark Streaming所有流操作的主要入口。一個StreamingContext對象可以用SparkConf對象創建:

        import org.apache.spark.*;

        import org.apache.spark.streaming.api.Java.*;

        SparkConf conf =  new SparkConf().setAppName(appName).setMaster(master);

        JavaStreamingContext ssc =  new JavaStreamingContext(conf, new Duration(1000));

      2、DStream輸入

        DStream輸入表示從數據源獲取的原始數據流。每個輸入流DStream和一個接收器(receiver)對象相關聯,這個Receiver從源中獲取數據,並將數據存入內存中用於處理。

        Spark Streaming有兩類數據源:

        基本源(basic source):在StreamingContext API中直接可用的源頭,例如文件系統、套接字連接、Akka的actor等。

        高級源(advanced source):包括 Kafka、Flume、Kinesis、Tiwtter等,他們需要通過額外的類來使用。

      3、DStream的轉換

        和RDD類似,transformation用來對輸入DStreams的數據進行轉換、修改等各種操作,當然,DStream也支持很多在Spark RDD的transformation算子。

        

 

      4、DStream的輸出

        和RDD類似,Spark Streaming允許將DStream轉換后的結果發送到數據庫、文件系統等外部系統中。目前,定義了Spark Streaming的輸出操作:

        

 

  4、Spark Streaming實時開發實例

      下面用字符計數這個例子來說明 Spark Streming

      首先,導入 Spark Streaming的相關類到環境中,這些類(如DStream)提供了流操作很多有用的方法,StreamingContext是Spark所有流操作的主要入口。

      其次,創建一個具有兩個執行線程以及1秒批間隔時間(即以秒為單位分隔數據流)的本地StreamingContext.

      

import org.apache.spark.{*, SparkConf}
import org.apache.spark.api.java.function.*
import org.apache.spark.streaming.{*, Duration, Durations}
import org.apache.spark.streaming.api.java.{*, JavaDStream, JavaStreamingContext}

import scala.Tuple2;

object streaming_test {
  def main(args: Array[String]): Unit = {
    //創建一個本地的StreamingContext上下文對象,該對象包含兩個工作線程,批處理間隔為1秒
    val conf  = new SparkConf().setMaster("local[2]").setAppName("Network-WordCount");

    val jssc = new JavaStreamingContext(conf,Durations.seconds(1));
    //利用這個上下文,能夠創建一個DStream,它表示從TCP源(主機為localhost,端口為9999)獲取的流式數據
    //創建一個連接到hostname:port的DStream對象,類似localhost:9999
    val lines =jssc.socketTextStream("localhost",9999);
    //這個lines變量是一個DStream,表示即將從數據服務器或的數據流,這個DStream的每條記錄都代表一行文本,
    // 接下來需要將DStream中的每行文本都切分為單詞
    val words =lines.flatMap(x:String => util.Arrays.asList(x.split(" ")).iterator());
    val pairs =words.mapToPair<s=>new Tuple2<>(s,1));
    val wordCounts =pairs.reduceByKey((i1,i2)=> i1+i2);
    wordCounts.print();
  }
}

   4、Spark Streaming調優實踐

    Spark Streaming作業的調優通常都涉及作業開發的優化、並行度的優化和批大小以及內存等資源的優化。

    1、作業開發優化

      RDD復用:對於實時作業,尤其是鏈路較長的作業,要盡量重復使用RDD,而不是重復創建多個RDD。另外需要多次使用的中間RDD,可以將其持久化,以降低每次都需要重復計算的開銷。

      使用效率較高的shuffle算子:如同Hadoop中的作業一樣,實時作業的shuffle操作會涉及數據重新分布,因此會耗費大量的內存、網絡和計算等資源,需要盡量降低需要shuffle的數據量,

      reduceByKey/aggregateByKey相比groupByKey,會在map端先進行預聚合,因此效率較高。

      類似於Hive的MapJoin:對於實時作業,join也會涉及數據的重新分布,因此如果是大數據量的RDD和小數據量的RDD進行join,可以通過broadcast與map操作實現類似於Hive的MapJoin,

      但是需要注意小數量的RDD不能過大,不然廣播數據的開銷也很大。

      其它高效的例子:如使用mapPartitions替代普通map,使用foreachPartitions替代foreach,使用repartitionAndSortWithinPartitions替代repartition與 sort類操作等。

    2、並行度和批大小

      對於Spark Streaming這種基於微批處理和實時處理框架來說,其調優不外乎兩點:

      一是盡量縮短每一批次的處理時間

      二是設置合適的batch size(即每批處理的數據量),使得數據處理的速度能夠適配數據流入的速度。

      第一點通常以設置源頭、處理、輸出的並發度來實現。

      源頭並發:如果源頭的輸入任務是實時作業的瓶頸,那么可以通過加大源頭的並發度提供性能,來保證數據能夠流入后續的處理鏈路。在Spark Streaming,可以通過如下代碼來實現(

      一Kafka源頭為例):

      int numStreams = 5;

      List<JavaPairDStream<String,String>> kafkaStreams = new ArrayList<JavaPairDStream<String,String>>(numStreams );

      for(int i=0;i<numStreams ;i++){

        kafkaStreams.add(KafkaUtils.createStream(...));

      }

       JavaPairDStream<String,String> unifiedStream = streamingContext.union( kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));

      處理並發:處理任務的並發決定了實際作業執行的物理視圖。Spark Streaming作業的默認並發度可以通過spark.default.parllelism來設置,但是實際中不推薦,建議針對每個任務單獨設置

      並發度進行精細控制。

      輸出並發:如圖Hadoop作業一樣,實時作業的shuffle操作會涉及數據重新分布,因此會耗費大量的內存、網絡和計算等資源,因此需要盡量減少shuffle操作。

      batch size:batch size主要影響系統的吞吐量和延遲。batch size 太小,一般處理延遲會降低,但是系統吞吐量會下降;batch size太大,吞吐量上去了,但是處理延遲會提高,同時要求的

      內存也會增加,因此實際中需要找到一個平衡點,既能滿足吞吐量也能滿足延遲的要求,那么實際中如何設置batch大小呢?

    參考資料:《離線和實時大數據開發實戰》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM