前言
在Spark的使用中,性能的調優配置過程中,查閱了很多資料,本文的思路是從spark最細節的本質,即核心的數據結構RDD出發,到整個Spark集群宏觀的調度過程做一個整理歸納,從微觀到宏觀兩方面總結,方便自己在調優過程中找尋問題,理清思路,也加深自己對於分布式程序開發的理解。(有任何問題和紕漏還請各位大牛指出啦,我會第一時間改正)
RDD詳談
在Spark開山之作"Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing"的這篇paper中(以下簡稱RDD Paper),Matei等提出了RDD這種數據結構,文中開頭對RDD定義是:
A distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner.
也就是說RDD設計的核心點為:
- 內存計算
- 適合於計算機集群
- 有容錯方式
文中提到了對於RDD設計的最大挑戰便是在提供有效的容錯機制(fault tolerance efficiently),之前存在的基於內存存儲的集群抽象,例如分布式共享內存、鍵值存儲、數據庫等,更多是細粒度的(fine-grained)更新一個可變狀態表,而其容錯方式通常為在機器間進行數據復制或者日志更新,而這些方式很明顯會造成機器負載加大以及大量的網絡傳輸開銷。
而RDD則使用了粗粒度的(coarse-grained)轉換,即對於很多相同的數據項使用同一種操作(如map/filter/join),這種方式能夠通過記錄RDD之間的轉換從而刻畫RDD的繼承關系(lineage),而不是真實的數據,最終構成一個DAG(有向無環圖),而如果發生RDD丟失,RDD會有充足的信息來得知怎么從其他RDDs重新計算得到。
這也是RDD設計的核心理念,接下來圍繞這一理念我們來剖析,看RDD是怎么實現這種高效的容錯機制的。
RDD存儲結構
RDD實現的數據結構核心是一個五元組,如下表:
屬性 | 說明 |
---|---|
分區列表-partitions | 每個分區為RDD的一部分數據 |
依賴列表-dependencies | table存儲其父RDD即依賴RDD |
計算函數-compute | 利用父分區計算RDD各分區的值 |
分區器-partitioner | 指明RDD的分區方式(hash/range) |
分區位置列表-preferredLocations | 指明分區優先存放的結點位置 |
其中每個屬性的代碼如下:
// RDD中的依賴關系由一個Seq數據集來記錄,這里使用Seq的原因是經常取第一個元素或者遍歷
private var dependencies_: Seq[Dependency[_]] = null
// 分區列表定義在一個數組中,這里使用Array的原因是隨時使用下標來訪問分區內容
// @transient分區列表不需要被序列化
@transient private var partitions_: Array[Partition] = null
// 接口定義,具體由子類實現,對輸入的RDD分區進行計算
def compute(split: Partition, context: TaskContext): Iterator[T]
// 分區器
// 可選,子類可以重寫以指定新的分區方式,Spark支持Hash和Range兩種分區方式
@transient val partitioner: Option[Partitioner] = None
// 可選,子類可以指定分區的位置,如HadoopRDD可以重寫此方法,讓分區盡可能與數據在相同的節點上
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
在RDD Paper中,作者提到在抽象RDD時,一個很重要的點便是如何使得RDD能夠記錄RDD之間的繼承依賴關系(lineage),這種繼承關系來自豐富的轉移(Transformation)操作。所以作者提出了一種基於圖的表示方式來實現這個目標,這也正是上面RDD五種屬性的核心作用。
這五種屬性從spark誕生到新的版本迭代,一直在使用,沒有增加也沒有減少,所以可以說Spark的核心就是RDD,而RDD的核心就是這五種屬性。
RDD的操作
在Spark踩坑記——初試中對RDD的操作也進行了簡單說明,在Spark中,對RDD的操作可以分為Transformation和Action兩種,我們分別進行整理說明:
Transformation
對於Transformation操作是指由一個RDD生成新RDD的過程,其代表了是計算的中間過程,其並不會觸發真實的計算。
-
map(f:T=>U) : RDD[T]=>RDD[U]
返回一個新的分布式數據集,由每個原元素經過func函數轉換后組成 -
filter(f:T=>Bool) : RDD[T]=>RDD[T]
返回一個新的數據集,由經過func函數后返回值為true的原元素組成 -
flatMap(f:T=>Seq[U]) : RDD[T]=>RDD[U])
類似於map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素) -
sample(withReplacement: Boolean, fraction: Double, seed: Long) : RDD[T]=>RDD[T]
sample將RDD這個集合內的元素進行采樣,獲取所有元素的子集。用戶可以設定是否有放回的抽樣、百分比、隨機種子,進而決定采樣方式。
withReplacement=true, 表示有放回的抽樣;
withReplacement=false, 表示無放回的抽樣。
如下圖:
每個方框是一個RDD分區。通過sample函數,采樣50%的數據。V1、V2、U1、U2、U3、U4采樣出數據V1和U1、U4,形成新的RDD。 -
groupByKey([numTasks]) : RDD[(K,V)]=>RDD[(K,Seq[V])]
在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。注意:- 默認情況下,使用與父RDD的partition數量對應的並行任務進行分組,也可以傳入numTask可選參數,根據數據量設置不同數目的Task。
- 另外如果相同key的value求和或者求平均,那么使用reduceByKey性能更好
-
reduceByKey(f:(V,V)=>V, [numTasks]) : RDD[(K, V)]=>RDD[(K, V)]
在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。 -
union(otherDataset) : (RDD[T],RDD[T])=>RDD[T]
返回一個新的數據集,由原數據集和參數聯合而成 -
join(otherDataset, [numTasks]) : (RDD[(K,V)],RDD[(K,W)])=>RDD[(K,(V,W))]
返回key值相同的所有匹配對,如下圖:
join操作會將兩個RDD中相同key值的合並成key,pair(value1, value2)的形式。 -
cogroup() : (RDD[(K,V)],RDD[(K,W)])=>RDD[(K,(Seq[V],Seq[W]))]
cogroup函數將兩個RDD進行協同划分。對在兩個RDD中的Key-Value類型的元素,每個RDD相同Key的元素分別聚合為一個集合,並且返回兩個RDD中對應Key的元素集合的迭代器(K, (Iterable[V], Iterable[w]))。其中,Key和Value,Value是兩個RDD下相同Key的兩個數據集合的迭代器所構成的元組。 -
cartesian(otherDataset) : (RDD[T],RDD[U])=>RDD[(T,U)]
笛卡爾積。但在數據集T和U上調用時,返回一個(T,U)對的數據集,所有元素交互進行笛卡爾積。 -
sortByKey([ascending], [numTasks]) : RDD[(K,V)]=>RDD[(K,V)]
根據key值進行排序,如果ascending設置為true則按照升序排序 -
repartition(numPartitions) :
對RDD中的所有數據進行shuffle操作,建立更多或者更少的分區使得更加平衡。往往需要通過網絡進行數據傳輸
Action
不同於Transformation操作,Action代表一次計算的結束,不再產生新的RDD,將結果返回到Driver程序。所以Transformation只是建立計算關系,而Action才是實際的執行者。每個Action都會調用SparkContext的runJob方法向集群正式提交請求,所以每個Action對應一個Job。
-
count() : RDD[T]=>Long
返回數據集的元素個數 -
countByKey() : RDD[T]=>Map[T, Long]
對(K,V)類型的RDD有效,返回一個(K,Int)對的Map,表示每一個key對應的元素個數 -
collect() : RDD[T]=>Seq[T]
在Driver中,以數組的形式,返回數據集的所有元素。這通常會在使用filter或者其它操作並返回一個足夠小的數據子集后再使用會比較有用。 -
reduce(f:(T,T)=>T) : RDD[T]=>T
通過函數func(接受兩個參數,返回一個參數)聚集數據集中的所有元素。這個功能必須可交換且可關聯的,從而可以正確的被並行執行。 -
saveAsTextFile(path:String)
將數據集的元素,以textfile的形式,保存到本地文件系統,HDFS或者任何其它hadoop支持的文件系統。對於每個元素,Spark將會調用toString方法,將它轉換為文件中的文本行 -
saveAsSequenceFile(path:String)
將數據集的元素,以Hadoop sequencefile的格式,保存到指定的目錄下,本地系統,HDFS或者任何其它hadoop支持的文件系統。這個只限於由key-value對組成,並實現了Hadoop的Writable接口,或者隱式的可以轉換為Writable的RDD。(Spark包括了基本類型的轉換,例如Int,Double,String,等等) -
saveAsObjectFile(path:String)
利用Java的Serialization接口進行持久化操作,之后可以使用SparkContext.objectFile()重新load回內存 -
take(n)
返回一個由數據集的前n個元素組成的數組。注意,這個操作目前並非並行執行,而是由驅動程序計算所有的元素 -
takeSample(withReplacement, num, [seed])
返回一個數組,在數據集中隨機采樣num個元素組成,可以選擇是否用隨機數替換不足的部分,Seed用於指定的隨機數生成器種子 -
takeOrdered(n, [ordering])
返回前n個元素,可以使用元素的自然順序,也可以使用用戶自定義comparator -
first()
返回數據集的第一個元素(類似於take(1)) -
foreach(func)
在數據集的每一個元素上,運行函數func進行更新。這通常用於邊緣效果,例如更新一個累加器,或者和外部存儲系統進行交互,例如HBase。關於foreach我在Spark踩坑記——數據庫(Hbase+Mysql)中對sparkstreaming的foreach操作有詳細整理
RDD依賴方式
RDD 的容錯機制是通過記錄更新來實現的,且記錄的是粗粒度的轉換操作。在外部,我們將記錄的信息稱為血統(Lineage)關系,而到了源碼級別,Apache Spark 記錄的則是 RDD 之間的依賴(Dependency)關系。在一次轉換操作中,創建得到的新 RDD 稱為子 RDD,提供數據的 RDD 稱為父 RDD,父 RDD 可能會存在多個,我們把子 RDD 與父 RDD 之間的關系稱為依賴關系,或者可以說是子 RDD 依賴於父 RDD。
依賴只保存父 RDD 信息,轉換操作的其他信息,如數據處理函數,會在創建 RDD 時候,保存在新的 RDD 內。依賴在 Apache Spark 源碼中的對應實現是 Dependency 抽象類。
Apache Spark 將依賴進一步分為兩類,分別是窄依賴(Narrow Dependency)和 Shuffle 依賴(Shuffle Dependency,在部分文獻中也被稱為 Wide Dependency,即寬依賴)。
窄依賴(Narrow Dependency)
窄依賴中,父 RDD 中的一個分區最多只會被子 RDD 中的一個分區使用,換句話說,父 RDD 中,一個分區內的數據是不能被分割的,必須整個交付給子 RDD 中的一個分區。下圖展示了幾類常見的窄依賴及其對應的轉換操作。
Shuffle依賴(寬依賴 Shffle/Wide Dependency)
Shuffle 依賴中,父 RDD 中的分區可能會被多個子 RDD 分區使用。因為父 RDD 中一個分區內的數據會被分割,發送給子 RDD 的所有分區,因此 Shuffle 依賴也意味着父 RDD 與子 RDD 之間存在着 Shuffle 過程。下圖展示了幾類常見的Shuffle依賴及其對應的轉換操作。
需要說明的是,依賴關系時RDD到RDD之間的一種映射關系,是兩個RDD之間的依賴,那么如果在一次操作中涉及到多個父RDD,也有可能同時包含窄依賴和Shuffle依賴,如join操作:
集群部署
組件
說到Spark集群的部署,我們先來討論一下Spark中一些關鍵的組件,在我的博文Spark踩坑記——初試中,我對Master/Worker/Driver/Executor幾個關鍵概念做了闡述。首先,先上官方文檔中的一張圖:
官方文檔對其中的術語進行了總結,如下表:
從官方文檔摘抄了這么多東東,對Spark中基本的集群結構,以及一個程序提交到Spark后的調度情況我們有了了解。
部署方式
對於集群的部署方式,Spark提供了多種集群部署方式,如下:
- Local模式:本地調試的一種模式,可以在一台機器上完成程序的運行與調試
- Standalone模式:即獨立模式,自帶完整的服務,可單獨部署到一個集群中,無需依賴任何其他資源管理系統。
- Spark On YARN模式:將Spark搭建在Hadoop之上,由hadoop中的yarn負責資源調配,Spark負責計算任務;
- Spark On Mesos模式:這是很多公司采用的模式,官方推薦這種模式(當然,原因之一是血緣關系)。正是由於Spark開發之初就考慮到支持Mesos,因此,目前而言,Spark運行在Mesos上會比運行在YARN上更加靈活,更加自然。目前在Spark On Mesos環境中,用戶可選擇兩種調度模式之一運行自己的應用程序。
集群部署舉例
由於在我平時的使用中,是直接采用的Standalone的部署方式,我這里將部署的框架做一個簡單的介紹,其他部署方式其實可以做一些參考來進行搭配部署:
假設我們的網段為10.214.55.x,其中1、2、3機器我們用作集群節點,4和5位master節點,這里我們用到了zookeeper,關於zookeeper的介紹大家可以在網上搜搜,我們這里加入zk的目的就是master節點如果崩潰后進行一個主備切換,保證集群能夠繼續正常運行。如果我們在1提交我們的應用,那么2和3就將作為我們的worker節點參與運算。而關於配置文件中需要的具體配置項可以參考官方文檔:Spark Standalone Mode
從RDD看集群任務調度
上文我們從微觀和宏觀兩個角度對Spark進行了總結,RDD以及RDD的依賴,Spark集群以及部署,那么當我們在提交了一個任務或者說Application到Spark集群時,它是怎么運作的呢?
- 首先我們通過maven或者sbt等,將我們的應用以及其依賴的jar包完整的打包,利用spark-submit命令將jar提交到spark;
- 提交程序的這個Spark節點會作為Driver節點,並從Cluster Manager中獲取資源;
- 程序會在worker節點中獲得executor用來執行我們的任務;
- 在spark程序中每次RDD的action變換會產生一個新的job,每個job包含多個task;
- 而RDD在進行Transformation時,會產生新的stage;
- task會被送往各個executor運行;
- 而最終的計算結果會回到driver節點進行匯總並輸出(如reduceByKey)。
針對這個過程,我們可以從微觀和宏觀兩個角度把控,將RDD的操作依賴關系,以及task在集群間的分配情況綜合起來看,如下圖:
Spark監控界面
在提交Spark任務時,我們可以在提交命令中加入一項參數--conf spark.ui.port=xxxx,其中"xxxx"為你需要的端口號,這樣在瀏覽器中我們就可以利用Spark提供的UI界面對Application的運行情況進行監控如下圖:
踩坑小記
在spark平時的使用過程當中,由於程序在整個集群當中奔跑,經常會遇到很多莫名其妙的錯誤,有時候通過日志給定的錯誤很難真的定位到真正的原因,那叫一個憂傷阿T^T
Driver程序崩潰
出現這類錯誤,往往日志中會提到JVM。在Spark中大多數操作會分擔到各個結點的worker進行計算,但是對於shuffle類操作,如我們經常會用的reduceByKey或者collect等,都會使得spark將所有結點的數據匯總到driver進行計算,這樣就會導致driver需要遠大於正常worker的內存,所以遇到這類問題,最先可以考慮的便是增加driver結點的內存,增加方式如下:
--driver-memory 15g
kafka編碼錯誤
在利用spark streaming的python版本,消費kafka數據的時候,遇到類似下面的問題:
UnicodeDecodeError: 'utf8' codec can't decode byte 0x85 in position 87: invalid start byte
我們知道python2中的字符串形式有兩種即unicode形式和普通str形式,通過反復分析日志和查看kafka.py的源碼找到了問題所在。首先在pyspark的kafka API中,找到createStream函數的如下說明:
圖中紅框內清楚的說明了,在解析kafka傳來的數據的時候,默認使用了utf8_decoder函數,那這個東東是個什么玩意呢,找到kafka.py的源碼,其定義如下:
# 默認解碼器
def utf8_decoder(s):
""" Decode the unicode as UTF-8 """
if s is None:
return None
return s.decode('utf-8')
class KafkaUtils(object):
@staticmethod
def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
storageLevel=StorageLevel.MEMORY_AND_DISK_2,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
Create an input stream that pulls messages from a Kafka Broker.
:param ssc: StreamingContext object
:param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
:param groupId: The group id for this consumer.
:param topics: Dict of (topic_name -> numPartitions) to consume.
Each partition is consumed in its own thread.
:param kafkaParams: Additional params for Kafka
:param storageLevel: RDD storage level.
:param keyDecoder: A function used to decode key (default is utf8_decoder)
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A DStream object
"""
if kafkaParams is None:
kafkaParams = dict()
kafkaParams.update({
"zookeeper.connect": zkQuorum,
"group.id": groupId,
"zookeeper.connection.timeout.ms": "10000",
})
if not isinstance(topics, dict):
raise TypeError("topics should be dict")
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
helper = KafkaUtils._get_helper(ssc._sc)
jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)
return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
...
我們看到默認的解碼器直接調用了s.decode,那么當kafka傳來的數據中有非utf8編碼的字符時,整個stage就會掛掉,所以修改如下:
def my_uft8_decoder(s):
if s is None:
return None
try:
return s.decode('utf-8', 'replace')
except Exception, e:
print e;
return None
# 創建stream時傳入
kafkaStream = KafkaUtils.createStream(ssc, \
conf.kafka_quorum, conf.kafka_consumer_group, {conf.kafka_topic:conf.spark_streaming_topic_parallelism}, {
"auto.commit.interval.ms":"50000",
"auto.offset.reset":"smallest",
},
StorageLevel.MEMORY_AND_DISK_SER,
valueDecoder=my_uft8_decoder
)
如果采用createDirectStream來創建context與此類似,不再贅述。所以在pyspark的kafka消費中遇到解碼問題可以關注一下這里。
總結
挺長的一篇整理,前后拖了很久。本篇博文我的構思主要就是,當我們提交了一個應用到Spark時,我們需要大致了解Spark做了什么,這里我並沒有分析源碼(因為我木有看哈哈)。從最微觀的RDD的操作,到宏觀的整個集群的調度運算,這樣從RDD看集群調度就有了一個整體的認識,當遇到問題的時候就更容易排查,遇到性能拼瓶頸也容易查找。OK,這就是這篇博文的全部整理哈,其中末尾部分闡述了在實際項目中遇到的一些問題和坑,如果有相似的問題的朋友可以參考下。
做個小廣告,項目是WeTest輿情,企鵝風訊,感興趣的歡迎大家來踩踩:
http://wetest.qq.com/bee/
參考文獻:
- 《Spark最佳實踐》陳歡 林世飛(鵝廠大神的作品^v^)
- Zaharia M, Chowdhury M, Das T, et al. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing[C]//Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation. USENIX Association, 2012: 2-2.
- spark源碼閱讀
- 【Spark】RDD操作詳解2——值型Transformation算子
- Spark Programming Guide
- Spark 開發指南
- pyspark.streaming module
- RDD 依賴
- Cluster Mode Overview
- Apache Spark探秘:三種分布式部署方式比較