在YARN上运行Spark

Spark自0.6.0版本起就支持运行在 YARN (Hadoop 下一代产品) 上, 在后续的版本中逐步加强。

准备

如果想在YARN上运行Spark, 需要下载内置YARN支持的Spark相应版本. 二进制的编译好的版本可以从Spark网站上下载。 如果想自己编译Spark,请参考Building Spark.

配置

大部分的配置属性和其它发布方式的配置相同. 你可以查看Spark配置指南了解更多的信息. 下面的配置属性是Spark on YARN特有的。

配置属性














































































Property NameDefaultMeaning
spark.yarn.applicationMaster.waitTries10
Set the number of times the ApplicationMaster waits for the the Spark master and then also the number of tries it waits for the SparkContext to be initialized
spark.yarn.submit.file.replicationThe default HDFS replication (usually 3)
HDFS replication level for the files uploaded into HDFS for the application. These include things like the Spark jar, the app jar, and any distributed cache files/archives.
spark.yarn.preserve.staging.filesfalse
Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
spark.yarn.scheduler.heartbeat.interval-ms5000
The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager.
spark.yarn.max.executor.failuresnumExecutors 2, with minimum of 3
The maximum number of executor failures before failing the application.
spark.yarn.historyServer.address(none)
The address of the Spark history server (i.e. host.com:18080). The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI.
spark.yarn.dist.archives(none)
Comma separated list of archives to be extracted into the working directory of each executor.
spark.yarn.dist.files(none)
Comma-separated list of files to be placed in the working directory of each executor.
spark.yarn.executor.memoryOverheadexecutorMemory 0.07, with minimum of 384
The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
spark.yarn.driver.memoryOverheaddriverMemory * 0.07, with minimum of 384
The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
spark.yarn.queuedefault
The name of the YARN queue to which the application is submitted.
spark.yarn.jar(none)
The location of the Spark jar file, in case overriding the default location is desired.
By default, Spark on YARN will use a Spark jar installed locally, but the Spark jar can also be
in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't
need to be distributed each time an application runs. To point to a jar on HDFS, for example,
set this configuration to "hdfs:///some/path".
spark.yarn.access.namenodes(none)
A list of secure HDFS namenodes your Spark application is going to access. For
example, spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032.
The Spark application must have acess to the namenodes listed and Kerberos must
be properly configured to be able to access them (either in the same realm or in
a trusted realm). Spark acquires security tokens for each of the namenodes so that
the Spark application can access those remote HDFS clusters.
spark.yarn.appMasterEnv.[EnvironmentVariableName](none)
Add the environment variable specified by EnvironmentVariableName to the
Application Master process launched on YARN. The user can specify multiple of
these and to set multiple environment variables. In yarn-cluster mode this controls
the environment of the SPARK driver and in yarn-client mode it only controls
the environment of the executor launcher.
spark.yarn.containerLauncherMaxThreads25
The maximum number of threads to use in the application master for launching executor containers.

启动Spark

确保HADOOP_CONF_DIR 或 YARN_CONF_DIR 指向hadoop集群配置文件所在的文件夹。 这些配置用来写dfs和连接YARN ResourceManager.

有两种启动Spark on YARN的模式. 在yarn-cluster 模式下, Spark driver运行在一个application master进程中, 此进程由YARN管理。 client初始化完应用后就可以不用了。 在yarn-client模式下, driver运行在client进程中, application master仅用作从YARN请求资源。 Spark standalone 和 Mesos 模式, 你必须指定具体的master地址, 在YARN模式ResourceManager的地址会从Hadoop配置中读取, 所以你只需指定master参数为 “yarn-client” 或 “yarn-cluster”.

在yarn-cluster模式启动Spark的命令如下:

1
./bin/spark-submit --class path.to.your.Class --master yarn-cluster [options] <app jar> [app options]

例如:

1
2
3
4
5
6
7
8
9
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--num-executors 3 \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
lib/spark-examples*.jar \
10

上面的命令启动一个 YARN client程序, 它会启动默认的Application Master. 然后SparkPi会作为一个Application Master的子线程运行. client会定期的获取Application Master的状态更新并显示在控制台中. 当你的应用运行完后client也会退出. 你可以下面的“查看日志”章节了解driver 和 executor日志.

如果想运行yarn-client模式, 使用同样的命令,只需将 “yarn-cluster” 替换成 “yarn-client”.

为了运行spark-shell:

1
$ ./bin/spark-shell --master yarn-client

增加其它的JAR

在yarn-cluster模式, driver运行在不同的机器上,而不是client端, 所以其它jar脱离了client后SparkContext.addJar不能工作. 为了保证jar可以被SparkContext.addJar能用, 需要使用参数--jars 指定它们.

1
2
3
4
5
$ ./bin/spark-submit --class my.main.Class \
--master yarn-cluster \
--jars my-other-jar.jar,my-other-other-jar.jar
my-main-jar.jar
app_arg1 app_arg2

调试你的程序

在YARN的术语中, executors和application master运行在容器中. 当应用完成后YARN有两中模式处理容器日志。如果log aggregation 设置为on (使用 yarn.log-aggregation-enable 配置), 容器的日志会被复制到HDFS,然后在本机删除. 这些日志可以在容器的任何机器上通过命令“yarn logs” 查看.

1
yarn logs -applicationId <app ID>

上面的命令会打印出指定的应用的所有的日志.

当log aggregation 没打开, 日志存放在每台机器的本地文件夹YARN_APP_LOGS_DIR, 通常配置为/tmp/logs 或 $HADOOP_HOME/logs/userlogs, 这依赖与你的Hadoop 版本和安装. 查看一个容器的日志需要登录到他们的机器上,找到日志的文件夹. 日志依据应用ID和容器ID以子文件夹的方式组织.

为了浏览先前的容器启动环境,增大yarn.nodemanager.delete.debug-delay-sec为一个比较大的值(例如 36000), 然后通过yarn.nodemanager.local-dirs访问应用cache. 这个文件夹包含启动脚本,JAR,和所有的启动环境变量. 这些对于调试应用是非常有用的(注意这需要admin权限和重启所有的node managers. 所以不具操作性this is not applicable to hosted clusters).

为了使用定制的log4j配置(application master or executors), 有两种方法:

  • 使用spark-submit上传一个定制的log4j.properties, 把它加入到 “–files” 指定的文件列表中
  • 增加 “-Dlog4j.configuration=" 参数到 "spark.driver.extraJavaOptions" (for the driver) 或者 "spark.executor.extraJavaOptions" (for executors). 注意如果使用文件, "file:" 协议应该显示的提供, 这个文件需要存在与每个节点的本地.

注意第一种方法, executors 和 application master使用同样的log4j配置,当它们运行在同一台机器上时可能会导致一些问题(例如 尝试写同样的log文件).

如果你需要引用正确的放置log位置, 在你的log4j.properties中使用 “spark.yarn.app.container.log.dir” . 例如,log4j.appender.fileappender.File={spark.yarn.app.container.log.dir}/spark.log. 对于streaming application, 配置 RollingFileAppender, 并设置log的位置, 避免日志将磁盘写满.

重要提示

  • 在Hadoop 2.2之前, YARN在资源请求中不支持core. 所以当运行早期的版本时, 通过命令行指定的core的数量不会传给YARN. Core requests如何调度依赖何种scheduler以及如何配置的
  • Spark executor使用的本地文件夹由YARN配置 (Hadoop YARN 配置 yarn.nodemanager.local-dirs). 如果用户指定了spark.local.dir, 无视之.
  • --files 和 --archives 参数支持类似Hadoop #符号指定文件名. 例如你可以指定: --files localtest.txt#appSees.txt, 它会上传你本地的localtest.txt到HDFS。but this will be linked to by the name appSees.txt, and your application should use the name as appSees.txt to reference it when running on YARN.
  • --jars参数允许SparkContext.addJar 函数引用其它类库。 It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.

本文翻译自Spark官方文档: Running Spark on YARN