有關這個問題,似乎這個在某些時候,用python寫好,且spark沒有響應的算法支持, 能否能在YARN集群上 運行PySpark方式,
將python分析程序提交上去?
Spark Application可以直接運行在YARN集群上,這種運行模式,會將資源的管理與協調統一交給YARN集群去處理,這樣能夠實現構建於YARN集群之上Application的多樣性,比如可以運行MapReduc程序,可以運行HBase集群,也可以運行Storm集群,還可以運行使用Python開發機器學習應用程序,等等。
我們知道,Spark on YARN又分為client模式和cluster模式:在client模式下,Spark Application運行的Driver會在提交程序的節點上,而該節點可能是YARN集群內部節點,也可能不是,一般來說提交Spark Application的客戶端節點不是YARN集群內部的節點,那么在客戶端節點上可以根據自己的需要安裝各種需要的軟件和環境,以支撐Spark Application正常運行。在cluster模式下,Spark Application運行時的所有進程都在YARN集群的NodeManager節點上,而且具體在哪些NodeManager上運行是由YARN的調度策略所決定的。
對比這兩種模式,最關鍵的是Spark Application運行時Driver所在的節點不同,而且,如果想要對Driver所在節點的運行環境進行配置,區別很大,但這對於PySpark Application運行來說是非常關鍵的。
PySpark是Spark為使用Python程序編寫Spark Application而實現的客戶端庫,通過PySpark也可以編寫Spark Application並在Spark集群上運行。Python具有非常豐富的科學計算、機器學習處理庫,如numpy、pandas、scipy等等。為了能夠充分利用這些高效的Python模塊,很多機器學習程序都會使用Python實現,同時也希望能夠在Spark集群上運行。
PySpark Application運行原理
理解PySpark Application的運行原理,有助於我們使用Python編寫Spark Application,並能夠對PySpark Application進行各種調優。PySpark構建於Spark的Java API之上,數據在Python腳本里面進行處理,而在JVM中緩存和Shuffle數據,數據處理流程如下圖所示(來自Apache Spark Wiki):
Spark Application會在Driver中創建pyspark.SparkContext對象,后續通過pyspark.SparkContext對象來構建Job DAG並提交DAG運行。使用Python編寫PySpark Application,在Python編寫的Driver中也有一個pyspark.SparkContext對象,該pyspark.SparkContext對象會通過Py4J模塊啟動一個JVM實例,創建一個JavaSparkContext對象。PY4J只用在Driver上,后續在Python程序與JavaSparkContext對象之間的通信,都會通過PY4J模塊來實現,而且都是本地通信。
PySpark Application中也有RDD,對Python RDD的Transformation操作,都會被映射到Java中的PythonRDD對象上。對於遠程節點上的Python RDD操作,Java PythonRDD對象會創建一個Python子進程,並基於Pipe的方式與該Python子進程通信,將用戶編寫Python處理代碼和數據發送到Python子進程中進行處理。
下面,我們基於Spark on YARN模式,並根據當前企業所具有的實際集群運行環境情況,來說明如何在Spark集群上運行PySpark Application,大致分為如下3種情況:
- YARN集群配置Python環境
這種情況,如果是初始安裝YARN、Spark集群,並考慮到了當前應用場景需要支持Python程序運行在Spark集群之上,這時可以准備好對應Python軟件包、依賴模塊,在YARN集群中的每個節點上進行安裝。這樣,YARN集群的每個NodeManager上都具有Python環境,可以編寫PySpark Application並在集群上運行。目前比較流行的是直接安裝Python虛擬環境,使用Anaconda等軟件,可以極大地簡化Python環境的管理工作。
這種方式的缺點是,如果后續使用Python編寫Spark Application,需要增加新的依賴模塊,那么就需要在YARN集群的每個節點上都進行該新增模塊的安裝。而且,如果依賴Python的版本,可能還需要管理不同版本Python環境。因為提交PySpark Application運行,具體在哪些NodeManager上運行該Application,是由YARN的調度器決定的,必須保證每個NodeManager上都具有Python環境(基礎環境+依賴模塊)。
- YARN集群不配置Python環境
這種情況,更適合企業已經安裝了規模較大的YARN集群,並在開始使用時並未考慮到后續會使用基於Python來編寫Spark Application,並且不想在YARN集群的NodeManager上安裝Python基礎環境及其依賴模塊。我們參考了Benjamin Zaitlen的博文(詳見后面參考鏈接),並基於Anaconda軟件環境進行了實踐和驗證,具體實現思路如下所示:
- 在任意一個Linux OS的節點上,安裝Anaconda軟件
- 通過Anaconda創建虛擬Python環境
- 在創建好的Python環境中下載安裝依賴的Python模塊
- 將整個Python環境打成zip包
- 提交PySpark Application時,並通過
--archives
選項指定zip包路徑
下面進行詳細說明:
首先,我們在CentOS 7.2上,基於Python 2.7,下載了Anaconda2-5.0.0.1-Linux-x86_64.sh安裝軟件,並進行了安裝。Anaconda的安裝路徑為/root/anaconda2。
然后,創建一個Python虛擬環境,執行如下命令:
1
|
conda create -n mlpy_env --copy -y -q python=2 numpy pandas scipy
|
上述命令創建了一個名稱為mlpy_env的Python環境,--copy
選項將對應的軟件包都安裝到該環境中,包括一些C的動態鏈接庫文件。同時,下載numpy、pandas、scipy這三個依賴模塊到該環境中。
接着,將該Python環境打包,執行如下命令:
1
2
|
cd
/root/anaconda2/envs
zip -r mlpy_env.zip mlpy_env
|
該zip文件大概有400MB左右,將該zip壓縮包拷貝到指定目錄中,方便后續提交PySpark Application:
1
|
cp
mlpy_env.zip
/tmp/
|
最后,我們可以提交我們的PySpark Application,執行如下命令:
1
2
3
4
5
|
PYSPARK_PYTHON=.
/ANACONDA/mlpy_env/bin/python
spark-submit \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=.
/ANACONDA/mlpy_env/bin/python
\
--master yarn-cluster \
--archives
/tmp/mlpy_env
.zip
#ANACONDA \
/var/lib/hadoop-hdfs/pyspark/test_pyspark_dependencies
.py
|
上面的test_pyspark_dependencies.py文件中,使用了numpy、pandas、scipy這三個依賴包的函數,通過上面提到的YARN集群的cluster模式可以運行在Spark集群上。
可以看到,上面的依賴zip壓縮包將整個Python的運行環境都包含在里面,在提交PySpark Application時會將該環境zip包上傳到運行Application的所在的每個節點上,並解壓縮后為Python代碼提供運行時環境。如果不想每次都從客戶端將該環境文件上傳到集群中運行PySpark Application的節點上,也可以將zip包上傳到HDFS上,並修改--archives
參數的值為hdfs:///tmp/mlpy_env.zip#ANACONDA,也是可以的。
另外,需要說明的是,如果我們開發的/var/lib/hadoop-hdfs/pyspark/test_pyspark_dependencies.py文件中,也依賴的一些我們自己實現的處理函數,具有多個Python依賴的文件,想要通過上面的方式運行,必須將這些依賴的Python文件拷貝到我們創建的環境中,對應的目錄為mlpy_env/lib/python2.7/site-packages/下面。
- 基於混合編程語言環境
假如我們還是希望使用Spark on YARN模式來運行PySpark Application,但並不將Python程序提交到YARN集群上運行。這時,我們可以考慮使用混合編程語言的方式,來處理數據任務。比如,機器學習Application具有迭代計算的特性,更適合在一個高配的節點上運行;而普通的ETL數據處理具有多機並行處理的特點,適合放到集群上進行分布式處理。
一個完整的機器學習Application的設計與構建,可以將算法部分和數據准備部分分離出來,使用Scala/Java進行數據預處理,輸出一個機器學習算法所需要(更便於迭代、尋優計算)的輸入數據格式,這會極大地壓縮算法輸入數據的規模,從而使算法迭代計算充分利用單機本地的資源(內存、CPU、網絡),這可能會比直接放到集群中計算要快得多。
因此,我們在對機器學習Application准備數據時,使用原生的Scala編程語言實現Spark Application來處理數據,包括轉換、統計、壓縮等等,將滿足算法輸入格式的數據輸出到HDFS指定目錄中。在性能方面,對數據規模較大的情況下,在Spark集群上處理數據,Scala/Java實現的Spark Application運行性能要好一些。然后,算法迭代部分,基於豐富、高性能的Python科學計算模塊,使用Python語言實現,其實直接使用PySpark API實現一個機器學習PySpark Application,運行模式為YARN client模式。這時,就需要在算法運行的節點上安裝好Python環境及其依賴模塊(而不需要在YARN集群的節點上安裝),Driver程序從HDFS中讀取輸入數據(緩存到本地),然后在本地進行算法的迭代計算,最后輸出模型。
總結
對於重度使用PySpark的情況,比如偏向機器學習,可以考慮在整個集群中都安裝好Python環境,並根據不同的需要進行依賴模塊的統一管理,能夠=極大地方便PySpark Application的運行。
不在YARN集群上安裝Python環境的方案,會使提交的Python環境zip包在YARN集群中傳輸帶來一定開銷,而且每次提交一個PySpark Application都需要打包一個環境zip文件,如果有大量的Python實現的PySpark Application需要在Spark集群上運行,開銷會越來越大。另外,如果PySpark應用程序修改,可能需要重新打包環境。但是這樣做確實不在需要考慮YARN集群集群節點上的Python環境了,任何版本Python編寫的PySpark Application都可以使用集群資源運行。
關於該問題,SPARK-13587(詳見下面參考鏈接)也在討論如果優化該問題,后續應該會有一個比較合適的解決方案。
參考鏈接
- https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals
- https://issues.apache.org/jira/browse/SPARK-13587
- http://quasiben.github.io/blog/2016/4/15/conda-spark/
- https://blog.cloudera.com/blog/2017/04/use-your-favorite-python-library-on-pyspark-cluster-with-cloudera-data-science-workbench/
- https://repo.continuum.io/pkgs/misc/parcels/archive/
- https://repo.continuum.io/pkgs/misc/parcels/
- https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_python.html
- https://docs.anaconda.com/anaconda/user-guide/tasks/integration/cloudera
- http://shiyanjun.cn/archives/1738.html