Spark的RDD原理以及2.0特性的介紹


轉載自:http://www.tuicool.com/articles/7VNfyif

王聯輝,曾在騰訊,Intel 等公司從事大數據相關的工作。2013 年 - 2016 年先后負責騰訊 Yarn 集群和 Spark 平台的運營與研發。曾負責 Intel Hadoop 發行版的 Hive 及 HBase 版本研發。參與過百度用戶行為數據倉庫的建設和開發,以及淘寶數據魔方和淘寶指數的數據開發工作。給 Spark 社區貢獻了 25+ 個 patch,接受的重要特性有 python on yarn-cluster,yarn-cluster 支持動態資源擴縮容以及 RDD Core 中 memory 管理等。

Spark 是什么 

Spark 是 Apache 頂級項目里面最火的大數據處理的計算引擎,它目前是負責大數據計算的工作。包括離線計算或交互式查詢、數據挖掘算法、流式計算以及圖計算等。全世界有許多公司和組織使用或給社區貢獻代碼,社區的活躍度見 www.github.com/apache/spark。

2013 年開始 Spark開發團隊成立 Databricks,來對 Spark 進行運作和管理,並提供 Cloud 服務。Spark 社區基本保持一個季度一個版本,不出意外的話 Spark 2.0 將在五月底發布。

與 Mapreduce 相比,Spark 具備 DAG 執行引擎以及基於內存的多輪迭代計算等優勢,在SQL 層面上,比 Hive/Pig 相比,引入關系數據庫的許多特性,以及內存管理技術。另外在 Spark 上所有的計算模型最終都統一基於 RDD 之上運行執行,包括流式和離線計算。Spark 基於磁盤的性能是 MR 的 10 倍,基於內存的性能是 MR 的 100 倍  。

Spark 提供 SQL、機器學習庫 MLlib、流計算 Streaming 和圖計算 Graphx,同時也支持 Scala、Java、Python 和 R 語言開發的基於 API 的應用程序。

RDD 的原理

RDD,英文全稱叫 Resilient Distributed Datasets。

an RDD is a read-only, partitioned collection of records. 字面意思是只讀的分布式數據集。

但其實個人覺得可以把 RDD 理解為關系數據庫 里的一個個操作,比如 map,filter,Join 等。在 Spark 里面實現了許多這樣的 RDD 類,即可以看成是操作類。當我們調用一個 map 接口,底層實現是會生成一個 MapPartitionsRDD 對象,當 RDD 真正執行時,會調用 MapPartitionsRDD 對象里面的 compute 方法來執行這個操作的計算邏輯。但是不同的是,RDD 是 lazy 模式,只有像 count,saveasText 這種 action 動作被調用后再會去觸發 runJob 動作。

RDD 分為二類:transformation 和 action。

transformation 是從一個 RDD 轉換為一個新的 RDD 或者從數據源生成一個新的 RDD;

action 是觸發 job 的執行。所有的 transformation 都是 lazy 執行,只有在 action 被提交的時候才觸發前面整個 RDD 的執行圖。如下

val file = sc.textFile(args(0))

val words = file.flatMap(line => line.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _, 2) wordCounts.saveAsTextFile(args(1))

這段代碼生成的 RDD 的執行樹是如下圖所示:

最終在 saveAsTextFile 方法時才會將整個 RDD 的執行圖提交給 DAG 執行引擎,根據相關信息切分成一個一個 Stage,每個 Stage 去執行多個 task,最終完成整個 Job 的執行。

還有一個區別就是,RDD 計算后的中間結果是可以被持久化,當下一次需要使用時,可以直接使用之前持久化好的結果,而不是重新計算,並且這些結果被存儲在各個結點的 executor 上。下一次使用時,調度器可以直接把 task 分發到存儲持久化數據的結點上,減少數據的網絡傳輸開稍。這種場景在數據挖掘迭代計算是經常出現。如下代碼

val links = spark.textFile(...).map(...).persist() var ranks = // RDD of (URL, rank) pairs

for (i <- 1 to ITERATIONS) {

// Build an RDD of (targetURL, float) pairs // with the contributions sent by each page val contribs = links.join(ranks).flatMap {

(url, (links, rank)) =>

links.map(dest => (dest, rank/links.size)) }

// Sum contributions by URL and get new ranks

ranks = contribs.reduceByKey((x,y) => x+y)

.mapValues(sum => a/N + (1-a)*sum) }

以上代碼生成的 RDD 執行樹如下圖所示:

計算 contribs-0 時需要使用 links 的計算邏輯,當 links 每個分片計算完后,會將這個結果保存到本地內存或磁盤上,下一次 contribs-1 計算要使用 links 的數據時,直接從上一次保存的內存和磁盤上讀取就可以了。這個持久化系統叫做 blockManager,類似於在內部再構建了一個 KV 系統,K 表示每個分區 ID 號,V 表示這個分區計算后的結果。

另外在 streaming 計算時,每個 batch 會去消息隊列上拉取這個時間段的數據,每個 Recevier 接收過來數據形成 block 塊並存放到 blockManager 上,為了可靠性,這個 block 塊可以遠程備份,后續的 batch 計算就直接在之前已讀取的 block 塊上進行計算,這樣不斷循環迭代來完成流處理。

一個 RDD 一般會有以下四個函數組成。

1. 操作算子的物理執行邏輯

定義為:

def compute(split: Partition, context: TaskContext): Iterator[T]

如在 MapPartitionsRDD 里的實現是如下:

override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))

函數定義

f: (TaskContext, Int, Iterator[T]) => Iterator[U]

2. 獲取分片信息

protected def getPartitions: Array[Partition] 

即這個操作的數據划分為多少個分 區。跟 mapreduce 里的 map 上的 split 類似的。

3. 獲取父 RDD 的依賴關系

protected def getDependencies: Seq[Dependency[_]] 

依賴分二種:如果 RDD 的每個分區最多只能被一個 Child RDD 的一個分區使用,則稱之為 narrow dependency;若依賴於多個 Child RDD 分區,則稱之為 wide dependency。不同的操作根據其特性,可能會產生不同的依賴 。如下圖所示

map 操作前后二個 RDD 操作之間的分區是一對一的關系,故產生 narrow dependency,而 join 操作的分區分別對應於它的二個子操作相對應的分區,故產生 wide dependency。當最后要生成具體的 task 運行時,就需要利用這個依賴關系也生成 Stage 的 DAG 圖。

4. 獲取該操作對應數據的存放位置信息,主要是針對 HDFS 這類有數據源的 RDD。

protected def getPreferredLocations(split: Partition): Seq[String]

Spark 的執行模式

Spark 的執行模式有 local、Yarn、Standalone、Mesos 四類。后面三個分別有 cluster 和 client 二種。client 和 cluster 的區別就是指 Driver 是在程序提交客戶端還是在集群的 AM 上。 比如常見的 Yarn-cluster 模式如下圖所示:

一般來說,運行簡單測試或 UT 用的是 local 模式運行,其實就是用多線程模似分布式執行。 如果業務部門較少且不需要對部門或組之間的資源做划分和優先級調度的話,可以使用 Standalone 模式來部署。

當如果有多個部門或組,且希望每個組織可以限制固定運行的最大資源,另外組或者任務需要有優先級執行的話,可以選擇 Yarn 或 Mesos。

Spark 2.0 的特性

Unifying DataFrames and Datasets in Scala/Java

DataFrame 和 Dataset的功能是什么?

它們都是提供給用戶使用,包括各類操作接口的 API。1.3 版本引入 DataFrame,1.6 版本引入 Dataset,2.0 提供的功能是將二者統一,即保留 Dataset,而把 DataFrame 定義為 Dataset[Row],即是 Dataset 里的元素對象為 Row 的一種(SPARK-13485)。

在參考資料中有介紹 DataFrame,它就是提供了一系列操作 API,與 RDD API 相比較,DataFrame 里操作的數據都是帶有 Schema 信息,所以 DataFrame 里的所有操作是可以享受 Spark SQL Catalyst optimizer 帶來的性能提升,比如 code generation 以及 Tungsten等。執行過程如下圖所示

但是 DataFrame 出來后發現有些情況下 RDD 可以表達的邏輯用 DataFrame 無法表達。比如 要對 group by 或 join 后的結果用自定義的函數,可能用 SQL 是無法表達的。如下代碼:

case class ClassData(a: String, b: Int)

case class ClassNullableData(a: String, b: Integer)

val ds = Seq(ClassData("a", 1), ClassData("a", 2)).toDS()

val agged = ds.groupByKey(d => ClassNullableData(d.a, null))

.mapGroups {

case (key, values) => key.a + values.map(_.b).sum

}

中間處理過程的數據是自定義的類型,並且 groupby 后的聚合邏輯也是自定義的,故用 SQL 比較難以表達,所以提出了 Dataset API。Dataset API 擴展 DataFrame API 支持靜態類型和運行已經存在的 Scala 或 Java 語言的用戶自定義函數。同時 Dataset 也能享受 Spark SQL 里所有性能 帶來的提升。

那么后面發現 Dataset 是包含了 DataFrame 的功能,這樣二者就出現了很大的冗余,故在 2.0 時將二者統一,保留 Dataset API,把 DataFrame 表示為 Dataset[Row],即 Dataset 的子集。

因此我們在使用 API 時,優先選擇 DataFrame & Dataset,因為它的性能很好,而且以后的優化它都可以享受到,但是為了兼容早期版本的程序,RDD API 也會一直保留着。后續 Spark 上層的庫將全部會用 DataFrame,比如 MLlib、Streaming、Graphx 等。

Whole-stage code generation

在參考資料 9 中有幾個例子的代碼比較,我們看其中一個例子:

select count(*) from store_sales where ss_item_sk = 1000

那么在翻譯成計算引擎的執行計划如下圖:

而通常物理計划的代碼是這樣實現的:

class Filter {

def next(): Boolean = {

var found = false

while (!found && child.next()) {

found = predicate(child.fetch())

}

return found

}

def fetch(): InternalRow = {

child.fetch()

}...

}

但是真正如果我們用 hard code 寫的話,代碼是這樣的:

var count = 0

for (ss_item_sk in store_sales) {

if (ss_item_sk == 1000) {

count += 1

}

}

發現二者相關如下圖所示:

那么如何使得計算引擎的物理執行速度能達到 hard code 的性能呢?這就提出了 whole-stage code generation,即對物理執行的多次調用轉換為代碼 for 循環,類似 hard code 方式,減少中間執行的函數調用次數,當數據記錄多時,這個調用次數是很大。 最后這個優化帶來的性能提升如下圖所示:

從 benchmark 的結果可以看出,使用了該特性后各操作的性能都有很大的提升。

Structured Streaming

Spark Streaming 是把流式計算看成一個一個的離線計算來完成流式計算,提供了一套 Dstream 的流 API,相比於其他的流式計算,Spark Streaming 的優點是容錯性和吞吐量上要有優勢,關於 Spark Streaming 的詳細設計思想和分析,可以到 https://github.com/lw-lin/CoolplaySpark 進行詳細學習和了解。

在 2.0 以前的版本,用戶在使用時,如果有流計算,又有離線計算,就需要用二套 API 去編寫程序,一套是 RDD API,一套是 Dstream API。而且 Dstream API 在易用性上遠不如 SQL 或 DataFrame。

為了真正將流式計算和離線計算在編程 API 上統一,同時也讓 Streaming 作業能夠享受 DataFrame/Dataset 上所帶來的優勢:性能提升和 API 易用,於是提出了 Structured Streaming。最后我們只需要基於 DataFrame/Dataset 可以開發離線計算和流式計算的程序,很容易使得 Spark 在 API 跟業界所說的 DataFlow 來統一離線計算和流式計算效果一樣。

比如在做 Batch Aggregation 時我們可以寫成下面的代碼

那么對於流式計算時,我們僅僅是調用了 DataFrame/Dataset 的不同函數代碼,如下:

最后,在 DataFrame/Dataset 這個 API 上可以完成如下圖所示的所有應用:

其他主要性能提升

采用 vectorized Parquet decoder 讀取 parquet 上數據。以前是一行一行的讀取,然后處理。現在改為一次讀取 4096 行記錄,不需要每處理一行記錄去調用一次 Parquet 獲取記錄的方法,而是改為一批去調用一次(SPARK-12854)。加上 Parquet 本身是列式存儲,這個優化使得 Parquet 讀取速度提高 3 倍。

采有 radix sort 來提高 sort 的性能(SPARK-14724)。在某些情況下排序性能可以提高 10-20 倍。

使用 VectorizedHashmap 來代替 Java 的 HashMap 加速 group by 的執行(SPARK-14319)。

將 Hive 中的 Window 函數用 Native Spark Window 實現,因為 Native Spark Window 在內存管理上有優勢(SPARK-8641)。

避免復雜語句中的邏輯相同部分在執行時重復計算(SPARK-13523)。

壓縮算法默認使用 LZ4(SPARK-12388)。

語句的增強

建立新的語法解析(SPARK-12362)滿足所有的 SQL 語法,這樣即合並 Hive 和標准 SQL 的語句解析,同時不依賴 Hive 的語法解析 jar(SPARK-14776)。之前版本二者的語法解析是獨立的,這樣導致在標准 SQL 中無法使用窗口函數或者 Hive 的語法,而在使用 Hive 語法時無法使用標准 SQL 的語法,比如 In/Exists 子句等。在 SQL 編寫時,沒法在一個 Context 把二者的范圍全部支持,然而有了這個特性后,使得 SQL 語句表達更強大,后續要增加任何語法,只需要維護這一個語法解析即可。當然缺點是后續 Hive 版本的新語法,需要手動添加進來。

支持 intersect/except(SPARK-12542)。如 select * from t1 except select * from t2 或者 select * from t1 intersect select * from t2。

支持 uncorrelated scalar subquery(SPARK-13417)。如 select (select min(value) from testData where key = (select max(key) from testData) - 1)。

支持 DDL/DML(SPARK-14118)。之前 DDL/DML 語句是調用 Hive 的 DDL/DML 語句命令來完成,而現在是直接在 Spark SQL 上就可以完成。

支持 multi-insert(SPARK-13924)。

支持 exist(SPARK-12545)和 NOT EXISTS(SPARK-10600),如 select * from (select 1 as a union all select 2 as a) t where exists (select * from (select 1 as b) t2 where b = a and b < 2)。

支持 subqueries 帶有 In/Not In 子句(SPARK-4226),如 select * from (select 1 as a union all select 2 as a) t where a in (select b as a from t2 where b < 2)。

支持 select/where/having 中使用 subquery(SPARK-12543),如 select * from t where a = (select max(b) from t2) 或 select max(a) as ma from t having ma = (select max(b) from t2)。

支持 LeftSemi/LeftAnti(SPARK-14853)。

支持在條件表達式 In/Not In 里使用子句(SPARK-14781),如 select * from l where l.a in (select c from r) or l.a in (select c from r where l.b < r.d)。

支持所有的 TPCDS 語句(SPARK-12540)。

與以前版本兼容(SPARK-11806)

不支持運行在 Hadoop 版本 < 2.2 上(SPARK-11807)。

去掉 HTTPBroadcast(SPARK-12588)。

去掉 HashShuffleManager(SPARK-14667)。

去掉 Akka RPC。

簡化與完善 accumulators and task metrics(SPARK-14626)。

將 Hive 語法解析以及語法移至 Core 里(SPARK-14825),在沒有 Hive 元數據庫和 Hive 依賴包時,我們可以像之前版本使用標准 SQL 一樣去使用 HiveQL 語句。

1.6 版本嚴重問題的解決

在 http://geek.csdn.net/news/detail/70162 提到的 1.6 問題中 Spillable 集合內存溢出問題在 SPARK-4452 里已解決,BlockManager 死鎖問題在 SPARK-12757 里已解決。

最后 2.0 版本還有一些其他的特性,如:

用 SparkSession 替換掉原來的 SQLContext and HiveContext。

mllib 里的計算用 DataFrame-based API 代替以前的 RDD 計算邏輯。

提供更多的 R 語言算法。

默認使用 Scala 2.11 編譯與運行。

參考資料

http://spark.apache.org/

https://databricks.com/blog/2016/05/11/spark-2-0-technical-preview-easier-faster-and-smarter.html

http://www.eecs.berkeley.edu/Pubs/TechRpts/2014/EECS-2014-12.pdf

http://www.infoq.com/cn/articles/spark-core-rdd

https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html

http://www.slideshare.net/databricks/spark-summit-eu-2015-spark-dataframes-simple-and-fast-analysis-of-structured-data

https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

https://databricks.com/blog/2016/01/04/introducing-spark-datasets.html

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6122906529858466/293651311471490/5382278320999420/latest.html

http://www.csdn.net/article/2014-01-28/2818282-Spark-Streaming-big-data

http://www.slideshare.net/rxin/the-future-of-realtime-in-spark

Q&A

1.  Hive 遷移到 Spark SQL 怎么遷移?有哪些坑?

王聯輝:一般 hive 執行有二種模式,一種是客戶端,一種是 HiveServer 模式。

對於客戶端命令行的話,直接把以前的 Hive 命令,改成 spark-sql 命令行+SQL 語句即可。

對於 HiveServer 的方式,Spark SQL 里提供 Hive thriftServer,可以使用這種方式。

2.  Spark 未來在內存管理上的優化應該是什么方向?有 RDD sharing on multiple application 計划嗎?用 Spark 感覺內存調優需要花很大功夫。

王聯輝:從 1.5 的 Tungsten 項目到 1.6,Spark 最大的一個亮點就是內存優化,所以 1.6 開始的版本不管數據量多大都完全沒問題,而且性能也比 MR 要快幾倍。

未來應該也不會在 Spark 內部去做 RDD sharing on multiple application,因為這其實是 Job server 去做的事情。1.6 的動態內存管理出來后,內存調優稍微簡單化了,因為 executor 和 storage 的內存可以動態調整,當然如果用 RDD 的 API,可能內存占用會大一些,因此后續推薦 DataFrame > Dataset > RDD api。

3.  Spark Streaming 在 checkpoint 后自動恢復,怎樣做到 exactly once 而且恢復速度快?

王聯輝:在出現故障時,Spark Streaming 本身是沒辦法做到完全 exactly once,這是一個事務性的問題。說白了,就是跟 Storm 做比較,Streaming 的優勢是吞吐量高,也就是一段間隔內執行時間快。

為什么?首先 Storm 如果要保證消息一定要被處理的話,每一條消息都需要有一個 ack 的附加消息,這個會導致網絡壓力大,而且消息之間的時序性是亂的。

而在 Spark Streaming,每次是小批量數據處理,中間過程不需要 ack,一旦失敗,則重新執行這個小批量數據。

具體可以看一下這篇文章:http://www.csdn.net/article/2014-01-28/2818282-Spark-Streaming-big-data

4. Spark 的應用場景有哪些?可以舉些實際應用場景例子嗎?

王聯輝:Spark 的理念是在一個 RDD 計算框架之上可以滿足各種應用,比如 HiveQL,MapReduce。除此之外,還有機器學習挖掘,流計算和圖計算。

其實應該是你有什么樣的計算場景,然后首先看 Spark 可不可以滿足,再來考慮別的計算模式。

5.  請問搭建 Spark+HDFS 這樣的集群一般硬件怎么選擇?CPU,內存,磁盤……

王聯輝:Spark 其實跟 HDFS 沒有啥依賴關系,所以邏輯是可以獨立搭建,但是物理上是在一台機器,CPU 跟內存是按比例搭配的,一般比如一個核是 4 - 6G,一台機器有 12 虛擬核的話,內存就可以配 64G,當然配 128G 內存也行,這樣一個核的內存用的更多,作業跑的就更快。

6:  RDD 在迭代的過程中,已經計算完成的 RDD 是何時釋放?

王聯輝:這個分二部分,一個是 RDD 的元信息,一個是 RDD 的數據。

元信息的話,當沒有人引用它了,在每個 job 執行完后就會自動釋放。

RDD 的數據,非 shuffle 的數據如果被持久化了,是需要用戶調用 unpersist 手動釋放。

shuffle 數據在 executor 丟失時會自動冊除,這個一般 Yarn/mesos 會去完成這個工作。

7.  Spark 2.0 中的新特性 Structured Streaming ,它的設計初衷是什么?使用場景是什么?

王聯輝:設計初衷就是把 Spark SQL 跟 Streaming 統一,讓 Streaming 任務可以用 Dataset 來表達,同時底下的執行可以用 Spark SQL 引擎。這樣在易用性和性能上都有提升。

之前的 Dstreaming API 能做的事情,都可以用 Dataset 去寫 Streaming 任務,最后離線計算和流式計算都用 Dataset 去編寫,達到 dataflow 的效果。

8.   Spark Streaming 作業在運行的過程中出現錯誤(比如,集群掛掉半小時后恢復),在重試 N 次后不能自動恢復運行?

王聯輝:首先可以需要解決的是集群為啥會掛掉半小時,是 HDFS 還是 yarn 的問題。假設不是 Spark Streaming 的問題,那么運行中出現錯誤,一般在二個地方會出錯,一個是 Receiver 采集數據時,一個是每個小 batch 計算時。

第一種情況,可能是你集群的 storage 不夠了,把前面沒有計算完的 block 刪除了,這種一般建議將 block 存儲加上磁盤,這樣即時內存不夠,可以刷磁盤。

第二種情況的話,就跟離線計算一樣,是任務執行出錯了,是不是數據傾斜了導致內存占用太高了,那么建議 partition 數設大一些。

9. Hive on Spark 生產環境穩定性如何?Spark Streaming 滑動窗口大於 1 小時以上的發現性能很低。大神那邊是怎么處理的或者有啥好的解決方案優化方案?

王聯輝:我之前沒有在生產使用 Hive on Spark,所以不做回答。

第二個問題,滑動窗口大於 1 小時以上,需要看具體的業務,因為你這個窗口時間比較大,可能數據也比較大,可能會導致集群的內存無法存放,建議的話,還不如先 5 分鍾計算一次,然后到一個小時再前面 5 分鍾的數據做個合並。這個在計算一個小時內 distinct 值時無法滿足。

10. Spark job 調度管理有啥好的方案嗎?azkaban 支持度如何?還有好的方式推薦嗎?

王聯輝:在上層作業調度來看,其實 Spark 作業跟 MR 作業沒有太大的區別,調度一個 Spark 任務跟調度一個 MR 是一樣的。因為據我所知,很多公司都會自已開發一套作業管理系統。所以我對這方面的開源系統也不是很了解。

11. Spark Streaming 和 Storm 比較,各自應用場景和優缺點是什么?

王聯輝:應用場景我就不回答,我覺得應該反過來問,比如我有這樣的應用場景,Spark Streaming 或 Storm 哪個更優?

優缺點的話,Spark Streaming 社區活躍度要高很多,遇到問題查找答案要容易,相反 Storm 這個優勢會弱一點,特別是 clojure 語言。

當然 Storm 在實時性方面比 Spark Streaming 要好,比如你要求在 1 秒或者毫秒內計算出結果,Storm 可能會比較容易做到,而 Spark Streaming 各方面的開稍使得當前可能達不到這個要求。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM