上一篇說到Spark的yarn client運行模式,它與yarn cluster模式的主要區別就是前者Driver是運行在客戶端,后者Driver是運行在yarn集群中。yarn client模式一般用在交互式場景中,比如spark shell, spark sql等程序,但是該模式下運行在客戶端的Driver與Yarn集群有大量的網絡交互,如果客戶端與集群之間的網絡不是很好,可能會導致性能問題。因此一般在生產環境中,大部分還是采用yarn cluster模式運行spark程序。
下面具體還是用計算PI的程序來說明,examples中該程序有三個版本,分別采用Scala、Python和Java語言編寫。本次用Python程序pi.py做說明。
1 from __future__ import print_function 2 3 import sys 4 from random import random 5 from operator import add 6 7 from pyspark.sql import SparkSession 8 9 10 if __name__ == "__main__": 11 """ 12 Usage: pi [partitions] 13 """ 14 spark = SparkSession\ 15 .builder\ 16 .appName("PythonPi")\ 17 .getOrCreate() 18 19 partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 20 n = 100000 * partitions 21 22 def f(_): 23 x = random() * 2 - 1 24 y = random() * 2 - 1 25 return 1 if x ** 2 + y ** 2 <= 1 else 0 26 27 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) 28 print("Pi is roughly %f" % (4.0 * count / n)) 29 30 spark.stop()
程序邏輯與上一篇Scala程序一樣,就不再多做說明了。
下面來以yarn cluster方式來執行這個程序,注意執行程序前先要啟動hdfs和yarn,最好同時啟動spark的history server,這樣即使在程序運行完以后也可以從Web UI中查看到程序運行情況。
輸入以下命令:
[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster $SPARK_HOME/examples/src/main/python/pi.py
以下是程序運行輸出信息部分截圖,
開始部分:
中間部分:
結束部分:
由於程序是以yarn cluster方式運行的,因此Driver是運行在Yarn集群當中(在BruceCentOS3上的ApplicationMaster進程當中),同時在BruceCentOS和BruceCentOS2上各運行了1個Executor進程(進程名字:CoarseGrainedExecutorBackend),而BruceCentOS4上的SparkSubmit進程僅僅作為yarn client向yarn集群提交spark程序。作為對比,在yarn client模式當中,客戶端SparkSubmit進程不僅作為yarn client提交程序,而且同時還會運行Driver,並啟動SparkContext,並且向Executor分配和管理Task,最后收集運行結果,因此yarn client模式程序輸出信息會顯示最終的打印結果。然而在yarn cluster模式當中,由於Driver運行在yarn集群的ApplicationMaster中,因此最終結果需要到ApplicationMaster進程的日志中取查看。可以通過如下命令查看。
SparkUI上的Executor信息:
BruceCentOS4上的客戶端進程:
BruceCentOS3上的ApplicationMaster進程(包含Spark Driver):
BruceCentOS上的Executor:
BruceCentOS2上的Executor:
下面具體描述下Spark程序在yarn cluster模式下運行的具體流程。
這里是一個流程圖:
- Spark Yarn Client向YARN提交應用程序,類似於MapReduce向Yarn提交程序,會將程序文件、庫文件和配置文件等上傳到HDFS。
- ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,其中ApplicationMaster中會運行Spark Driver,並進行SparkContext的初始化。
- ApplicationMaster向ResourceManager注冊,這樣用戶可以直接通過ResourceManager查看應用程序的運行狀態,然后它將采用輪詢的方式通過RPC協議為各個任務申請資源,並監控它們的運行狀態直到運行結束。
- 一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向ApplicationMaster中的SparkContext注冊並申請Task。這一點和Standalone模式一樣,只不過SparkContext在Spark Application中初始化時,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler進行任務的調度。
- ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向ApplicationMaster匯報運行的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務。
- 應用程序運行完成后,ApplicationMaster向ResourceManager申請注銷並關閉自己。
以上就是個人對Spark運行模式(yarn cluster)的一點理解,其中參考了“求知若渴 虛心若愚”博主的“Spark(一): 基本架構及原理”的部分內容(其中基於Spark2.3.0對某些細節進行了修正),在此表示感謝。