通過之前的文章【Spark RDD詳解】,大家應該了解到Spark會通過DAG將一個Spark job中用到的所有RDD划分為不同的stage,每個stage內部都會有很多子任務處理數據,而每個stage的任務數是決定性能優劣的關鍵指標。
首先來了解一下Spark中分區的概念,其實就是將要處理的數據集根據一定的規則划分為不同的子集,每個子集都算做一個單獨的分區,由集群中不同的機器或者是同一台機器不同的core進行分區並行處理。
Spark對接不同的數據源,在第一次得到的分區數是不一樣的,但都有一個共性:對於map類算子或者通過map算子產生的彼此之間具有窄依賴關系的RDD的分區數,子RDD分區與父RDD分區是一致的。而對於通過shuffle差生的子RDD則由分區器決定,當然默認分區器是HashPartitioner,我們完全可以根據實際業務場景進行自定義分區器,只需繼承Parttioner組件,主要重寫幾個方法即可
以加載hdfs文件為例,Spark在讀取hdfs文件還沒有調用其他算子進行業務處理前,得到的RDD分區數由什么決定呢?關鍵在於文件是否可切分!
對於可切分文件,如text文件,那么通過加載文件得到的RDD的分區數默認與該文件的block數量保持一致;
對於不可切分文件,它只有一個block塊,那么得到的RDD的分區數默認也就是1。
當然,我們可以通過調用一些算子對RDD進行重分區,如repartition。
這里必須要強調一點,很多小伙伴不理解,RDD既然不存儲數據,那么加載過來的文件都跑哪里去了呢?這里先給大家提個引子——blockmanager,Spark自己實現的存儲管理器。RDD的存儲概念其實block,至於block的大小可以根據不同的數據源進行調整,blockmanager的數據存儲、傳輸都是以block進行的。至於block內部傳輸的時候,它的大小也是可以通過參數控制的,比如廣播變量、shuffle傳輸時block的大小等。
下面再通過大家熟知的一個參數spark.default.parallelism為引,聊一聊Spark並行度都由哪些因素決定?
上圖是spark官網關於spark.default.parallelism參數說明:
-
對於reduceByKey和join這些分布式shuffle算子操作,取決於它的父RDD中分區數的最大值
-
對於沒有父RDD的的算子,比如parallelize,依賴於集群管理器:
-
本地模式:取決於本地機器的核數
-
如果集群管理器是Mesos,則為8
-
其他的:對比所有executor上總核數與2比較,哪個大是哪個
當然上面這些都是默認值,如果我們自己設置了分區數,情況就會有所變化,直接看源碼【查看org.apache.spark.Partitioner源碼defaultPartitioner方法】
你會發現,如果你使用reducebykey、groupByKey等這些帶shuffle的算子,建議不要通過上述方法讓程序內部去推測。完全可以通過傳入一個確定的分區數或者自己實現一個分區器來做處理。當然這個確定的分區數也不是貿貿然設定的,需要結合你的業務場景根據實際情況來確定多少合適。比如shuffle時流經的數據量,這個就要結合分區數和shuffle總數據量來做適當調整,處理不好的結果極有可能導致數據傾斜等問題...
筆者再次建議,學習Spark一定要多看Spark官網http://spark.apache.org/,並且多看源碼
關注微信公眾號:大數據學習與分享,獲取更對技術干貨