spark系列-2、Spark 核心數據結構:彈性分布式數據集 RDD


一、RDD(彈性分布式數據集)

RDD 是 Spark 最核心的數據結構,RDD(Resilient Distributed Dataset)全稱為彈性分布式數據集,是 Spark 對數據的核心抽象,也是最關鍵的抽象,它實質上是一組分布式的 JVM 不可變對象集合,不可變決定了它是只讀的,所以 RDD 在經過變換產生新的 RDD 時,原有 RDD 不會改變。

1.1、設計背景

在實際應用中,存在許多迭代式計算,這些應用場景的共同之處是,不同計算階段之間會重用中間結果,即一個階段的輸出結果會作為下一個階段的輸入。但是,目前的MapReduce框架都是把中間結果寫入到HDFS中,帶來了大量的數據復制、磁盤IO和序列化開銷。顯然,如果能將結果保存在內存當中,就可以大量減少IO。RDD就是為了滿足這種需求而出現的,它提供了一個抽象的數據架構,我們不必擔心底層數據的分布式特性,只需將具體的應用邏輯表達為一系列轉換處理,不同RDD之間的轉換操作形成依賴關系,可以實現管道化,從而避免了中間結果的落地存儲,大大降低了數據復制、磁盤IO和序列化開銷。

1.2、RDD概念

一個RDD就是一個分布式對象集合,本質上是一個只讀的分區記錄集合,每個RDD可以分成多個分區,每個分區就是一個數據集片段(HDFS上的塊),並且一個RDD的不同分區可以被保存到集群中不同的節點上,從而可以在集群中的不同節點上進行並行計算。RDD提供了一種高度受限的共享內存模型,即RDD是只讀的記錄分區的集合,不能直接修改,只能基於穩定的物理存儲中的數據集來創建RDD,或者通過在其他RDD上執行確定的轉換操作(如map、join和groupBy)而創建得到新的RDD。RDD提供了一組豐富的操作以支持常見的數據運算,分為“行動”(Action)和“轉換”(Transformation)兩種類型,前者用於執行計算並指定輸出的形式,后者指定RDD之間的相互依賴關系。兩類操作的主要區別是,轉換操作(比如map、filter、groupBy、join等)接受RDD並返回RDD,而行動操作(比如count、collect等)接受RDD但是返回非RDD(即輸出一個值或結果)。

RDD典型的執行過程如下:

  • RDD讀入外部數據源(或者內存中的集合)進行創建;
  • RDD經過一系列的“轉換”操作,每一次都會產生不同的RDD,供給下一個“轉換”使用;
  • 最后一個RDD經“行動”操作進行處理,並輸出到外部數據源(或者變成Scala/JAVA集合或變量)。
  • 需要說明的是,RDD采用了惰性調用,即在RDD的執行過程中,真正的計算發生在RDD的“行動”操作,對於“行動”之前的所有“轉換”操作,Spark只是記錄下“轉換”操作應用的一些基礎數據集以及RDD生成的軌跡,即相互之間的依賴關系,而不會觸發真正的計算(Action才會計算)。

如下圖所示:從輸入中邏輯上生成A和C兩個RDD,經過一系列“轉換”操作,邏輯上生成了F(也是一個RDD),之所以說是邏輯上,是因為這時候計算並沒有發生,Spark只是記錄了RDD之間的生成和依賴關系。當F要進行輸出時,也就是當F進行“行動”操作的時候,Spark才會根據RDD的依賴關系生成DAG,並從起點開始真正的計算。

這一系列處理稱為一個“血緣關系(Lineage)”,即DAG拓撲排序的結果。采用惰性調用,通過血緣關系連接起來的一系列RDD操作就可以實現管道化(pipeline)(A->B通過內存傳數據,避免了IO),避免了多次轉換操作之間數據同步的等待,而且不用擔心有過多的中間數據,因為這些具有血緣關系的操作都管道化了,一個操作得到的結果不需要保存為中間數據,而是直接管道式地流入到下一個操作進行處理。同時,這種通過血緣關系就是把一系列操作進行管道化連接的設計方式,也使得管道中每次操作的計算變得相對簡單,保證了每個操作在處理邏輯上的單一性;相反,在MapReduce的設計中,為了盡可能地減少MapReduce過程,在單個MapReduce中會寫入過多復雜的邏輯。

 程序示例:

val conf = new SparkConf
val sparkContext = new SparkContext(conf)
val sourceRDD :RDD = sparkContext.textFile(logFile)
val countRDD = sourceRDD.filter(_.contains("hello world")).cache().count()
println(countRDD)

可以看出,一個Spark應用程序,基本是基於RDD的一系列計算操作。

  • 第2行代碼用於創建JavaSparkContext對象;
  • 第3行代碼從HDFS文件中讀取數據創建一個RDD;
  • 第4行代碼對sourceRDD進行轉換操作得到一個新的RDD,即countRDD;
  • sourceRDD.cache()表示對lines進行持久化,把它保存在內存或磁盤中(這里采用cache接口把數據集保存在內存中),方便后續重復使用,當數據被反復訪問時(比如查詢一些熱點數據,或者運行迭代算法),這是非常有用的,而且通過cache()可以緩存非常大的數據集,支持跨越幾十甚至上百個節點(每個機器存自己負責的那一部分 Executor);sourceRDD.count()是一個行動操作,用於計算一個RDD集合中包含的元素個數。

這個程序的執行過程如下:

  • 創建這個Spark程序的執行上下文,即創建SparkContext對象;
  • 從外部數據源(即HDFS文件)中讀取數據創建sourceRDD對象;
  • 構建起sourceRDD和countRDD之間的依賴關系,形成DAG圖,這時候並沒有發生真正的計算,只是記錄轉換的軌跡;
  • 執行action代碼時,count()是一個行動類型的操作,觸發真正的計算,並把結果持久化到內存中,最后計算出sourceRDD中包含的元素個數。

1.3、RDD特性

總體而言,Spark采用RDD以后能夠實現高效計算的主要原因如下:

  • 高效的容錯性。現有的分布式共享內存、鍵值存儲、內存數據庫等,為了實現容錯,必須在集群節點之間進行數據復制或者記錄日志,也就是在節點之間會發生大量的數據傳輸,這對於數據密集型應用而言會帶來很大的開銷。在RDD的設計中,數據只讀,不可修改,如果需要修改數據,必須從父RDD轉換到子RDD,由此在不同RDD之間建立了血緣關系。所以,RDD是一種天生具有容錯機制的特殊集合,不需要通過數據冗余的方式(比如詳細的記錄操作的日志)實現容錯,而只需通過RDD父子依賴(血緣)關系重新計算得到丟失的分區來實現容錯,無需回滾整個系統,這樣就避免了數據復制的高開銷,而且重算過程可以在不同節點之間並行進行,實現了高效的容錯。此外,RDD提供的轉換操作都是一些粗粒度的操作(比如map、filter和join),RDD依賴關系只需要記錄這種粗粒度的轉換操作,而不需要記錄具體的數據和各種細粒度操作的日志(比如對哪個數據項進行了修改),這就大大降低了數據密集型應用中的容錯開銷
  • 中間結果持久化到內存。數據在內存中的多個RDD操作之間進行傳遞,不需要“落地”到磁盤上,避免了不必要的讀寫磁盤開銷;
  • 存放的數據可以是Java對象,避免了不必要的對象序列化和反序列化開銷。

1.4、RDD之間的依賴關系

RDD中不同的操作會使得不同RDD中的分區會產生不同的依賴。RDD中的依賴關系分為窄依賴(Narrow Dependency)與寬依賴(伴隨shuffle)(Wide Dependency)
兩種依賴之間的區別:

  • 窄依賴表現為一個或多個父RDD的分區對應於一個子RDD的分區;
  • 寬依賴則表現為存在一個父RDD的一個分區對應一個子RDD的多個分區

總體而言,如果父RDD的一個分區只被一個子RDD的一個分區所使用就是窄依賴,否則就是寬依賴。窄依賴典型的操作包括map、filter、union等,寬依賴典型的操作包括groupByKey,sortByKey等。對於連接(join)操作,可以分為兩種情況。

  1. 對輸入進行協同划分,屬於窄依賴。所謂協同划分(co-partitioned)是指多個父RDD的某一分區的所有“鍵(key)”,落在子RDD的同一個分區內,不會產生同一個父RDD的某一分區,落在子RDD的兩個分區的情況。
  2. 對輸入做非協同划分,屬於寬依賴。

對於窄依賴的RDD,可以以流水線的方式計算所有父分區,不會造成網絡之間的數據混合。對於寬依賴的RDD,則通常伴隨着Shuffle操作,即首先需要計算好所有父分區數據,然后在節點之間進行Shuffle。

Spark的這種依賴關系設計,使其具有了天生的容錯性,大大加快了Spark的執行速度。因為,RDD數據集通過“血緣關系”記住了它是如何從其它RDD中演變過來的,血緣關系記錄的是粗顆粒度的轉換操作行為,當這個RDD的部分分區數據丟失時,它可以通過血緣關系獲取足夠的信息來重新運算和恢復丟失的數據分區,由此帶來了性能的提升。相對而言,在兩種依賴關系中,窄依賴的失敗恢復更為高效,它只需要根據父RDD分區重新計算丟失的分區即可(不需要重新計算所有分區),而且可以並行地在不同節點進行重新計算。而對於寬依賴而言,單個節點失效通常意味着重新計算過程會涉及多個父RDD分區,開銷較大。此外,Spark還提供了數據檢查點和記錄日志,用於持久化中間RDD,從而使得在進行失敗恢復時不需要追溯到最開始的階段。在進行故障恢復時,Spark會對數據檢查點開銷和重新計算RDD分區的開銷進行比較,從而自動選擇最優的恢復策略。

1.5、階段的划分

Spark通過分析各個RDD的依賴關系生成了DAG,再通過分析各個RDD中的分區之間的依賴關系來決定如何划分階段。
具體划分方法是:
在DAG中進行反向解析,遇到寬依賴就斷開,遇到窄依賴就把當前的RDD加入到當前的階段中;
將窄依賴盡量划分在同一個階段中,可以實現流水線計算。例如,假設從HDFS中讀入數據生成3個不同的RDD(即A、C和E),通過一系列轉換操作后再將計算結果保存回HDFS。對DAG進行解析時,在依賴圖中進行反向解析,由於從RDD A到RDD B的轉換以及從RDD B和F到RDD G的轉換,都屬於寬依賴,因此,在寬依賴處斷開后可以得到三個階段,即階段1、階段2和階段3。可以看出,在階段2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,比如,分區7通過map操作生成的分區9,可以不用等待分區8到分區10這個轉換操作的計算結束,而是繼續進行union操作,轉換得到分區13,這樣流水線執行大大提高了計算的效率。

由上述論述可知,把一個DAG圖划分成多個“階段”以后,每個階段都代表了一組關聯的、相互之間沒有Shuffle依賴關系的任務組成的任務集合。每個任務集合會被提交給任務調度器(TaskScheduler)進行處理,由任務調度器將任務分發給Executor運行。

1.6、RDD在Spark架構中的運行過程

(1)創建RDD對象;
(2)SparkContext負責計算RDD之間的依賴關系,構建DAG;
(3)DAGScheduler負責把DAG圖分解成多個階段,每個階段中包含了多個任務,每個任務會被任務調度器分發給各個工作節點(Worker Node)上的Executor去執行。

 1.7、創建RDD

  • 並行化集合
    • 這種 RDD 純粹是為了學習,將內存中的集合變量轉換為 RDD,沒太大實際意義。
    • val conf: SparkConf = new SparkConf()
          conf.setMaster("local[*]")
            .setAppName("wordCount")
          val context: SparkContext = new SparkContext(conf)
      
          val sourceRDD: RDD[String] = context.parallelize(
            List(
              "a b c d e",
              "a b c d",
              "a b c",
              "a b",
              "a")
          )
  • 從 HDFS 中讀取,這種生成 RDD 的方式是非常常用的
    • val conf: SparkConf = new SparkConf()
          conf.setMaster("local[*]")
            .setAppName("wordCount")
          val context: SparkContext = new SparkContext(conf)
          context.textFile("hdfs://namenode:8020/user/me/wiki.txt")
  • 從外部數據源讀取
    • Spark 從 MySQL 中讀取數據返回的 RDD 類型是 JdbcRDD,顧名思義,是基於 JDBC 讀取數據的,這點與 Sqoop 是相似的,但不同的是 JdbcRDD 必須手動指定數據的上下界,也就是以 MySQL 表某一列的最值作為切分分區的依據。
    • val conf: SparkConf = new SparkConf()
          conf.setMaster("local[*]")
            .setAppName("wordCount")
          val context: SparkContext = new SparkContext(conf)
          val lowerBound = 1
          val upperBound = 1000
          val numPartition = 10
          val rdd = new JdbcRDD(context,() => {
            Class.forName("com.mysql.jdbc.Driver").newInstance()
            DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
          },
            "SELECT content FROM mysqltable WHERE ID >= ? AND ID <= ?",
            lowerBound,
            upperBound,
            numPartition,
            r => r.getString(1)
          )
    • 既然是基於 JDBC 進行讀取,那么所有支持 JDBC 的數據庫都可以通過這種方式進行讀取,也包括支持 JDBC 的分布式數據庫,但是需要注意的是,從代碼可以看出,這種方式的原理是利用多個 Executor 同時查詢互不交叉的數據范圍,從而達到並行抽取的目的。但是這種方式的抽取性能受限於 MySQL 的並發讀性能,單純提高 Executor 的數量到某一閾值后,再提升對性能影響不大。
    • 上面的是通過 JDBC 讀取數據庫的方式,對於 HBase 這種分布式數據庫來說,情況有些不同,HBase 這種分布式數據庫,在數據存儲時也采用了分區的思想,HBase 的分區名為 Region,那么基於 Region 進行導入這種方式的性能就會比上面那種方式快很多,是真正的並行導入。
    • //val spark: SparkSession = .......
      val sc = spark.sparkcontext
      val tablename = "your_hbasetable"  
      val conf = HBaseConfiguration.create()  
      conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")  
      conf.set("hbase.zookeeper.property.clientPort", "2181")  
      conf.set(TableInputFormat.INPUT_TABLE, tablename)  
      val rdd= sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],  
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],  
      classOf[org.apache.hadoop.hbase.client.Result]) 
      // 利用HBase API解析出行鍵與列值
      rdd_three.foreach{case (_,result) => {    
          val rowkey = Bytes.toString(result.getRow)  
          val value1 = Bytes.toString(result.getValue("cf".getBytes,"c1".getBytes))
      }
    • HBase 有一個第三方組件叫 Phoenix,可以讓 HBase 支持 SQL 和 JDBC,在這個組件的配合下,第一種方式也可以用來抽取 HBase 的數據,此外,Spark 也可以讀取 HBase 的底層文件 HFile,從而直接繞過 HBase 讀取數據。
    • 通過第三方庫的支持,Spark 幾乎能夠讀取所有的數據源,例如 Elasticsearch。

 1.8、PairRDD

PairRDD 與其他 RDD 並無不同,只不過它的數據類型是 Tuple2[K,V],表示鍵值對,因此這種 RDD 也被稱為 PairRDD,泛型為 RDD[(K,V)],而普通 RDD 的數據類型為 Int、String 等。這種數據結構決定了 PairRDD 可以使用某些基於鍵的算子,如分組、匯總等。PairRDD 可以由普通 RDD 轉換得到:

//val spark: SparkSession = .......
val a = spark.sparkcontext.textFile("/user/me/wiki").map(x => (x,x))

二、Transformations算子

下面列出了Spark常用的transformation操作。詳細的細節請參考RDD API文檔(Scala、Java、Python、R)和鍵值對RDD方法文檔(Scala、Java)。

  • map(func)
    • 將原來RDD的每個數據項,使用map中用戶自定義的函數func進行映射,轉變為一個新的元素,並返回一個新的RDD。
  • filter(func)
    • 使用函數func對原RDD中數據項進行過濾,將符合func中條件的數據項組成新的RDD返回。
  • flatMap(func)
    • 類似於map,但是輸入數據項可以被映射到0個或多個輸出數據集合中,所以函數func的返回值是一個數據項集合而不是一個單一的數據項。
  • mapPartitions(func)
    • 類似於map,但是該操作是在每個分區上分別執行,所以當操作一個類型為T的RDD時func的格式必須是Iterator<T> => Iterator<U>。即mapPartitions需要獲取到每個分區的迭代器,在函數中通過這個分區的迭代器對整個分區的元素進行操作。
  • mapPartitionsWithIndex(func)
    • 類似於mapPartitions,但是需要提供給func一個整型值,這個整型值是分區的索引,所以當處理T類型的RDD時,func的格式必須為(Int, Iterator<T>) => Iterator<U>。
  • union(otherDataset)
    • 返回原數據集和參數指定的數據集合並后的數據集。使用union函數時需要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合並的RDD元素數據類型相同。該操作不進行去重操作,返回的結果會保存所有元素。如果想去重,可以使用distinct()。
  • intersection(otherDataset)
    • 返回兩個數據集的交集。
  • distinct([numTasks]))
    • 將RDD中的元素進行去重操作。
  • groupByKey([numTasks])
    • 操作(K,V)格式的數據集,返回 (K, Iterable)格式的數據集。
    • 注意,如果分組是為了按key進行聚合操作(例如,計算sum、average),此時使用reduceByKey或aggregateByKey計算效率會更高。
    • 注意,默認情況下,並行情況取決於父RDD的分區數,但可以通過參數numTasks來設置任務數。
  • reduceByKey(func, [numTasks])
    • 使用給定的func,將(K,V)對格式的數據集中key相同的值進行聚集,其中func的格式必須為(V,V) => V。可選參數numTasks可以指定reduce任務的數目。
  • aggregateByKey(zeroValue)(seqOp, combOp,[numTasks])
    • 對(K,V)格式的數據按key進行聚合操作,聚合時使用給定的合並函數和一個初試值,返回一個(K,U)對格式數據。需要指定的三個參數:zeroValue為在每個分區中,對key值第一次讀取V類型的值時,使用的U類型的初始變量;seqOp用於在每個分區中,相同的key中V類型的值合並到zeroValue創建的U類型的變量中。combOp是對重新分區后兩個分區中傳入的U類型數據的合並函數。
  • sortByKey([ascending], [numTasks])
    • (K,V)格式的數據集,其中K已實現了Ordered,經過sortByKey操作返回排序后的數據集。指定布爾值參數ascending來指定升序或降序排列。
  • join(otherDataset, [numTasks])
    • 用於操作兩個鍵值對格式的數據集,操作兩個數據集(K,V)和(K,W)返回(K, (V, W))格式的數據集。通過leftOuterJoin、rightOuterJoin、fullOuterJoin完成外連接操作。
  • cogroup(otherDataset, [numTasks])
    • 用於操作兩個鍵值對格式數據集(K,V)和(K,W),返回數據集格式為 (K,(Iterable, Iterable)) 。這個操作也稱為groupWith。對在兩個RDD中的Key-Value類型的元素,每個RDD相同Key的元素分別聚合為一個集合,並且返回兩個RDD中對應Key的元素集合的迭代器。
  • cartesian(otherDataset)
    • 對類型為T和U的兩個數據集進行操作,返回包含兩個數據集所有元素對的(T,U)格式的數據集。即對兩個RDD內的所有元素進行笛卡爾積操作。
  • pipe(command, [envVars])
    • 以管道(pipe)方式將 RDD的各個分區(partition)使用 shell命令處理(比如一個 Perl或 bash腳本)。 RDD的元素會被寫入進程的標准輸入(stdin),將進程返回的一個字符串型 RDD(RDD of strings),以一行文本的形式寫入進程的標准輸出(stdout)中。
  • coalesce(numPartitions)
    • 把RDD的分區數降低到通過參數numPartitions指定的值。在得到的更大一些數據集上執行操作,會更加高效。
  • repartition(numPartitions)
    • 隨機地對RDD的數據重新洗牌(Reshuffle),從而創建更多或更少的分區,以平衡數據。總是對網絡上的所有數據進行洗牌(shuffles)。
  • repartitionAndSortWithinPartitions(partitioner)
    • 根據給定的分區器對RDD進行重新分區,在每個結果分區中,按照key值對記錄排序。這在每個分區中比先調用repartition再排序效率更高,因為它可以將排序過程在shuffle操作的機器上進行。

3、Actions算子

下面列出了Spark支持的常用的action操作。詳細請參考RDD API文檔(Scala、Java、Python、R)和鍵值對RDD方法文檔(Scala、Java)。

  • reduce(func)
    • 使用函數func聚集數據集中的元素,這個函數func輸入為兩個元素,返回為一個元素。這個函數應該符合結合律和交換了,這樣才能保證數據集中各個元素計算的正確性。
  • collect()
    • 在驅動程序中,以數組的形式返回數據集的所有元素。通常用於filter或其它產生了大量小數據集的情況。
  • count()
    • 返回數據集中元素的個數。
  • first()
    • 返回數據集中的第一個元素(類似於take(1))。
  • take(n)
    • 返回數據集中的前n個元素。
  • takeOrdered(n, [ordering])
    • 返回RDD按自然順序或自定義順序排序后的前n個元素。
  • saveAsTextFile(path)
    • 將數據集中的元素以文本文件(或文本文件集合)的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中。Spark將在每個元素上調用toString方法,將數據元素轉換為文本文件中的一行記錄。
  • saveAsSequenceFile(path) (Java and Scala)
    • 將數據集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中。該操作只支持對實現了Hadoop的Writable接口的鍵值對RDD進行操作。在Scala中,還支持隱式轉換為Writable的類型(Spark包括了基本類型的轉換,例如Int、Double、String等等)。
  • saveAsObjectFile(path) (Java and Scala)
    • 將數據集中的元素以簡單的Java序列化的格式寫入指定的路徑。這些保存該數據的文件,可以使用SparkContext.objectFile()進行加載。
  • countByKey()
    • 僅支持對(K,V)格式的鍵值對類型的RDD進行操作。返回(K,Int)格式的Hashmap,(K,Int)為每個key值對應的記錄數目。
  • foreach(func)
    • 對數據集中每個元素使用函數func進行處理。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM