【原】Spark on YARN


在YARN上運行Spark

在Spark0.6.0版本開始支持YARN模式,隨后的版本在逐漸地完善。

在YARN上啟動Spark

確保HADOOP_CONF_DIR或YARN_CONF_DIR屬性的值已經指向了Hadoop集群的配置文件。Spark通常使用這些配置信息來向HDFS寫入數據和連接到YARN資源管理器。這個目錄下所有的文件將會被分發到YARN集群中,所以所有應用使用的容器都使用同樣的配置。如果Java的系統屬性或YARN沒有管理的環境變量等配置,它們應該在Spark 的應用配置項中配置。

在YARN上啟動Spark有兩種部署模式。在Cluster模式中,Sparkdriver程序運行在被YARN管理的集群中的任何一個master進程中,並且client初始化應用后可以退出。在Client模式中,driver程序運行在client進程中,並且這個應用程序的master只能被用來從YARN上請求資源。

和Spark StandaloneMesos模式不同的是,master的地址被指定在--master參數中,在YARN模式中,ResourceManager的地址可以在Hadoop的配置文件中找到。這樣,--master的的參數是yarn

在cluster模式中啟動Spark應用程序:

$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]

舉例:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \

    --master yarn \

    --deploy-mode cluster \

    --driver-memory 4g \

    --executor-memory 2g \

    --executor-cores 1 \

    --queue thequeue \

    lib/spark-examples*.jar \

    10

上面的應用例子將會啟動一個YARN client程序,它將會啟動默認的應用Master。而SparkPi將會作為應用Master的一個子線程運行。client將會周期性地輪詢應用Master來達到轉態的更新並把它們顯示在console終端。一旦你的應用程序運行完畢,client將會退出。

在client模式中啟動Spark應用和cluster模式一樣,只是將cluster替換為client。如下所示:

$ ./bin/spark-shell --master yarn --deploy-mode client

添加其他Jar

在cluster模式中,driver程序和client在不同的機器上,所以只對於本機的可行的SparkContext.addJar將會失效。為了使client繼續能使用SparkContext.addJar,可以在創建命令時給--jars選項賦值。

$ ./bin/spark-submit --class my.main.Class \

    --master yarn \

    --deploy-mode cluster \

    --jars my-other-jar.jar,my-other-other-jar.jar

    my-main-jar.jar

    app_arg1 app_arg2

預備

在YARN上運行Spark要求一個支持YARN的一個二進制發布包。你可以在官網上下載,也可以自己編譯一個。

配置

Spark on YARN上的許多配置和其他模式基本上一樣。

調試你的應用程序

在YARN中,executor和應用master運行在“containers”(容器)中。應用程序運行完畢后,YARN提供了兩種存放容器日志的方式。如果日志聚合服務被開啟的話(通過yarn.log-aggregation-enable來配置),容器日志將會被拷貝到HDFS中並且刪除本機上的日志文件。這些日志文件使用yarn logs命令可以在任何一台集群中的機器看到。如下:

 

yarn logs -applicationId <app ID>

上面的命令將會打印出應用程序申請到的所有容器日志文件的內容。你也可以通過HDFS shellAPI來直接看這些容器文件。這些日志文件的目錄可以查看YARN配置(yarn.nodemanager.remote-app-log-dir and yarn.nodemanager.remote-app-log-dir-suffix)。這些日志在Spark Web UI的“Executors”的選項卡中也能看到。你需要啟動Spark history serverMapReduce history server並且正確地在yarn-site.xml配置好 yarn.log.server.url選項。這個Spark history server UI的日志URL將會把重定向到MapReducehistory server,從而顯示日志信息。

當日志聚合服務關閉時,日志被保留在每台機器的YARN_APP_LOGS_DIR目錄下,該目錄通常被用來配置為/tmp/logs或$HADOOP_HOME/logs/userlogs,這取決於Hadoop的版本和安裝。查看一個容器的日志信息需要到對應的主機上的這個目錄下查找。子目錄名稱通過應用ID和容器ID來構成。這種日志在Spark WebUIExecutors選項卡中也能看到並且不要求啟動MapReduce history server,因為不需要讀取HDFS上的數據。

回顧一下每個容器創建的環境,增加yarn.nodemanager.delete.debug-delay-sec到一個大數值(比如36000),並且在容器上創建的節點上的yarn.nodemanager.local-dirs中得到應用程序的緩存。這個目錄包括創建的腳本,JARs和用於創建每個容器的所有環境變量。它對於調試classpath問題是特別有用的。(注意允許這種方式在集群的設置和所有節點的重啟需要管理員權限,這樣的話它宿主機上不可用。)

對每個應用的masterexecutors使用自定義的log4j配置的話, 請看下:

  • 用spark-submit上傳一個自己編寫的log4j.properties文件,通過--files參數把它和應用一起提交。
  • 給每個driver添加值-Dlog4j.configuration=<location of configuration file>到spark.driver.extraJavaOptions選項中。注意如果使用該選項的話,該文件需要在所有的節點上都存在。

l 上傳$SPARK_CONF_DIR/log4j.properties文件后,它會和其他的配置一樣自己更新。注意如果多個option指定時,上面介紹的那種option比這種有更高的優先權。

Note that for the first option, both executors and the application master will share the same log4j configuration, which may cause issues when they run on the same node (e.g. trying to write to the same log file).

注意,對於第一種option而言,所有的executors和應用程序master將會使用同樣的log4j配置,當他們運行在一樣的節點上可能會出問題(例如:寫入到同樣的日志文件中,也就是並發寫,不難理解吧)

如果在Yarn中你需要一個合適的位置來存放日志文件,通過在你的log4j.properties中配置spark.yarn.app.container.log.dir,那么yarn可以更好的聚合它們並展示。例如:

log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log.對於Streaming程序而言,配置RollingFileAppender和yarn的日志文件目錄將避免大日志文件造成的磁盤移除,而且,日志也可以很好地被YARN使用。

 

重點提示

 

  • 在調度決策中主要的資源請求是否得到,取決於正在使用的調度器和它的具體配置。
  • 在cluster模式中,Spark executorsdriver將會使用為YARN配置的本地文件目錄(Hadoop YARN配置項 yarn.nodemanager.local-dirs)。如果使用特定的spark.local.dir,它將會失效。在client模式中,Spark executors將會使用YARN配置的本地目錄,但Spark driver將使用spark.local.dir選項定義好的。這是因為Client模式下Spark driver只是Sparkexecutor在執行,沒有運行在YARN集群中。
  • --files和--archives選項支持和Hadoop一樣使用#來指定文件別名。例如:你可以指定--files localtest.txt#appSees.txt,那么它將會把本地文件的localtest.txt文件上傳到HDFS中,可理解為,它在HDFS中文件名將是appSees.tx,在YARN中使用appSees.txt文件名即可。
  • --jars 選項,如果你在使用本地文件和運行在cluster模式時,SparkContext.addJar函數將會起作用。如果你正在用HDFSHTTPHTTPSFTP的文件時,它不需要。

 

 

Spark 屬性配置項,可根據如下列表進行參數的調整:

 

Property Name Default Meaning
spark.yarn.am.memory 512m Amount of memory to use for the YARN Application Master in client mode, in the same format as JVM memory strings (e.g. 512m, 2g). In cluster mode, use spark.driver.memory instead.

Use lower-case suffixes, e.g. k, m, g, t, and p, for kibi-, mebi-, gibi-, tebi-, and pebibytes, respectively.

spark.driver.cores 1 Number of cores used by the driver in YARN cluster mode. Since the driver is run in the same JVM as the YARN Application Master in cluster mode, this also controls the cores used by the YARN Application Master. In client mode, use spark.yarn.am.cores to control the number of cores used by the YARN Application Master instead.
spark.yarn.am.cores 1 Number of cores to use for the YARN Application Master in client mode. In cluster mode, use spark.driver.cores instead.
spark.yarn.am.waitTime 100s In cluster mode, time for the YARN Application Master to wait for the SparkContext to be initialized. In client mode, time for the YARN Application Master to wait for the driver to connect to it.
spark.yarn.submit.file.replication The default HDFS replication (usually 3) HDFS replication level for the files uploaded into HDFS for the application. These include things like the Spark jar, the app jar, and any distributed cache files/archives.
spark.yarn.preserve.staging.files false Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather than delete them.
spark.yarn.scheduler.heartbeat.interval-ms 3000 The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. The value is capped at half the value of YARN's configuration for the expiry interval, i.e. yarn.am.liveness-monitor.expiry-interval-ms.
spark.yarn.scheduler.initial-allocation.interval 200ms The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager when there are pending container allocation requests. It should be no larger than spark.yarn.scheduler.heartbeat.interval-ms. The allocation interval will doubled on successive eager heartbeats if pending containers still exist, until spark.yarn.scheduler.heartbeat.interval-ms is reached.
spark.yarn.max.executor.failures numExecutors * 2, with minimum of 3 The maximum number of executor failures before failing the application.
spark.yarn.historyServer.address (none) The address of the Spark history server, e.g. host.com:18080. The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI. For this property, YARN properties can be used as variables, and these are substituted by Spark at runtime. For example, if the Spark history server runs on the same node as the YARN ResourceManager, it can be set to ${hadoopconf-yarn.resourcemanager.hostname}:18080.
spark.yarn.dist.archives (none) Comma separated list of archives to be extracted into the working directory of each executor.
spark.yarn.dist.files (none) Comma-separated list of files to be placed in the working directory of each executor.
spark.executor.instances 2 The number of executors. Note that this property is incompatible with spark.dynamicAllocation.enabled. If both spark.dynamicAllocation.enabled and spark.executor.instances are specified, dynamic allocation is turned off and the specified number of spark.executor.instances is used.
spark.yarn.executor.memoryOverhead executorMemory * 0.10, with minimum of 384 The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
spark.yarn.driver.memoryOverhead driverMemory * 0.10, with minimum of 384 The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
spark.yarn.am.memoryOverhead AM memory * 0.10, with minimum of 384 Same as spark.yarn.driver.memoryOverhead, but for the YARN Application Master in client mode.
spark.yarn.am.port (random) Port for the YARN Application Master to listen on. In YARN client mode, this is used to communicate between the Spark driver running on a gateway and the YARN Application Master running on YARN. In YARN cluster mode, this is used for the dynamic executor feature, where it handles the kill from the scheduler backend.
spark.yarn.queue default The name of the YARN queue to which the application is submitted.
spark.yarn.jar (none) The location of the Spark jar file, in case overriding the default location is desired. By default, Spark on YARN will use a Spark jar installed locally, but the Spark jar can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a jar on HDFS, for example, set this configuration to hdfs:///some/path.
spark.yarn.access.namenodes (none) A comma-separated list of secure HDFS namenodes your Spark application is going to access. For example, spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032. The Spark application must have access to the namenodes listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm). Spark acquires security tokens for each of the namenodes so that the Spark application can access those remote HDFS clusters.
spark.yarn.appMasterEnv.[EnvironmentVariableName] (none) Add the environment variable specified by EnvironmentVariableName to the Application Master process launched on YARN. The user can specify multiple of these and to set multiple environment variables. In cluster mode this controls the environment of the Spark driver and in client mode it only controls the environment of the executor launcher.
spark.yarn.containerLauncherMaxThreads 25 The maximum number of threads to use in the YARN Application Master for launching executor containers.
spark.yarn.am.extraJavaOptions (none) A string of extra JVM options to pass to the YARN Application Master in client mode. In cluster mode, use spark.driver.extraJavaOptions instead.
spark.yarn.am.extraLibraryPath (none) Set a special library path to use when launching the YARN Application Master in client mode.
spark.yarn.maxAppAttempts yarn.resourcemanager.am.max-attempts in YARN The maximum number of attempts that will be made to submit the application. It should be no larger than the global number of max attempts in the YARN configuration.
spark.yarn.am.attemptFailuresValidityInterval (none) Defines the validity interval for AM failure tracking. If the AM has been running for at least the defined interval, the AM failure count will be reset. This feature is not enabled if not configured, and only supported in Hadoop 2.6+.
spark.yarn.submit.waitAppCompletion true In YARN cluster mode, controls whether the client waits to exit until the application completes. If set to true, the client process will stay alive reporting the application's status. Otherwise, the client process will exit after submission.
spark.yarn.am.nodeLabelExpression (none) A YARN node label expression that restricts the set of nodes AM will be scheduled on. Only versions of YARN greater than or equal to 2.6 support node label expressions, so when running against earlier versions, this property will be ignored.
spark.yarn.executor.nodeLabelExpression (none) A YARN node label expression that restricts the set of nodes executors will be scheduled on. Only versions of YARN greater than or equal to 2.6 support node label expressions, so when running against earlier versions, this property will be ignored.
spark.yarn.tags (none) Comma-separated list of strings to pass through as YARN application tags appearing in YARN ApplicationReports, which can be used for filtering when querying YARN apps.
spark.yarn.keytab (none) The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the YARN Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically. (Works also with the "local" master)
spark.yarn.principal (none) Principal to be used to login to KDC, while running on secure HDFS. (Works also with the "local" master)
spark.yarn.config.gatewayPath (none) A path that is valid on the gateway host (the host where a Spark application is started) but may differ for paths for the same resource in other nodes in the cluster. Coupled with spark.yarn.config.replacementPath, this is used to support clusters with heterogeneous configurations, so that Spark can correctly launch remote processes.

The replacement path normally will contain a reference to some environment variable exported by YARN (and, thus, visible to Spark containers).

For example, if the gateway node has Hadoop libraries installed on /disk1/hadoop, and the location of the Hadoop install is exported by YARN as the HADOOP_HOME environment variable, setting this value to /disk1/hadoop and the replacement path to $HADOOP_HOME will make sure that paths used to launch remote processes properly reference the local YARN configuration.

spark.yarn.config.replacementPath (none) See spark.yarn.config.gatewayPath.
spark.yarn.security.tokens.${service}.enabled

true

 

Controls whether to retrieve delegation tokens for non-HDFS services when security is enabled. By default, delegation tokens for all supported services are retrieved when those services are configured, but it's possible to disable that behavior if it somehow conflicts with the application being run.

Currently supported services are: hive, hbase


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM