Spark Standalone Mode

目录 [−]

  1. 在集群上安装Spark Standalone
  2. 手工启动集群
  3. 集群启动脚本
  4. 连接应用到集群
  5. 启动编译的Spark应用
  6. 资源调度
  7. 监控和日志
  8. 和Hadoop集成
  9. 为网络安全配置端口
  10. 高可用
    1. 使用ZooKeeper建立master备用机
      1. 简介
      2. 配置
      3. 细节
    2. 使用本地文件实现单节点恢复
      1. 简介
      2. 配置
      3. 细节

除了在Mesos或YARN集群管理器运行,Spark还提供了一个简单的独立部署模式。你可以手动启动一个独立的集群,通过手动启动master和worker,或使用我们提供的启动脚本。也可以在一台机器上运行这些守护进程进行测试,。

在集群上安装Spark Standalone

要安装Spark独立模式,您只需将Spark的编译版本放到集群中的每个节点上。您可以获取Spark的每个编译版本或建立自己编译

手工启动集群

你可以通过下面的命令启动standalone master:

1
./sbin/start-master.sh

一旦启动, master会打印出它的URL spark://HOST:PORT, worker可以用它连接master, 或者作为“master” 参数传给SparkContext. 你也可以在master的WEB UI上找到这个URL, WEB UI缺省地址是 http://localhost:8080 .

类似地, 你可以启动一个或多个worker,并连接到master:

1
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT

一旦你启动了worker, 你可以查看 master的 web UI (缺省是http://localhost:8080 )查看worker情况. 你会看到一个新的节点被列出来, 包括CPU核数和内存(减去为OS预留的1G).

(你可以访问worker的web UI了解它的执行情况, URL地址默认为http://:8081)

下面的配置参数可以传给master和worker。

参数 意义
-i IP, --ip IP IP address or DNS name to listen on
-p PORT, --port PORT Port for service to listen on (default: 7077 for master, random for worker)
--webui-port PORT Port for web UI (default: 8080 for master, 8081 for worker)
-c CORES, --cores CORES Total CPU cores to allow Spark applications to use on the machine (default: all available); only on worker
-m MEM, --memory MEM Total amount of memory to allow Spark applications to use on the machine, in a format like 1000M or 2G (default: your machine's total RAM minus 1 GB); only on worker
-d DIR, --work-dir DIR Directory to use for scratch space and job output logs (default: SPARK_HOME/work); only on worker

集群启动脚本

要用启动脚本启动一个Spark standalone集群,你需要在Spark目录下创建一个名为conf/slaves,其中应该包含所有机器的主机名(hostname),每行一个机器主机名。master必须能够通过无密码ssh访问每个从机(使用私钥)。为了进行测试,你可以把localhost加到这个文件中。

一旦创建好文件, 你就可以使用下面的shell脚本启动或者停止你的集群了。 这些脚本基于Hadoop的发布脚本, 可以在SPARK_HOME/bin找到:

  • sbin/start-master.sh - 在脚本执行的机器上启动master.
  • sbin/start-slaves.sh - 启动conf/slaves 文件中配置的所有的slave.
  • sbin/start-all.sh - 启动上面描述的master和salve.
  • sbin/stop-master.sh - 停止bin/start-master.sh 脚本启动的master.
  • sbin/stop-slaves.sh - 停止conf/slaves 文件中配置的slave.
  • sbin/stop-all.sh - 停止上面描述的master和slave.

注意这些脚本必须在你想运行的master机器上执行,而不是你的本地机。

你还可以通过在conf/spark-env.sh文件中设置环境变量来配置集群. 可以参考conf/spark-env.sh.template模版, 把它复制到你所有的worker机器上才起作用. 下面的设置项可用:

环境变量 意义
SPARK_MASTER_IP Bind the master to a specific IP address, for example a public one.
SPARK_MASTER_PORT Start the master on a different port (default: 7077).
SPARK_MASTER_WEBUI_PORT Port for the master web UI (default: 8080).
SPARK_MASTER_OPTS Configuration properties that apply only to the master in the form "-Dx=y" (default: none). See below for a list of possible options.
SPARK_LOCAL_DIRS Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks.
SPARK_WORKER_CORES Total number of cores to allow Spark applications to use on the machine (default: all available cores).
SPARK_WORKER_MEMORY Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GB); note that each application's individual memory is configured using its spark.executor.memory property.
SPARK_WORKER_PORT Start the Spark worker on a specific port (default: random).
SPARK_WORKER_WEBUI_PORT Port for the worker web UI (default: 8081).
SPARK_WORKER_INSTANCES Number of worker instances to run on each machine (default: 1). You can make this more than 1 if you have have very large machines and would like multiple Spark worker processes. If you do set this, make sure to also set SPARK_WORKER_CORES explicitly to limit the cores per worker, or else each worker will try to use all the cores.
SPARK_WORKER_DIR Directory to run applications in, which will include both logs and scratch space (default: SPARK_HOME/work).
SPARK_WORKER_OPTS Configuration properties that apply only to the worker in the form "-Dx=y" (default: none). See below for a list of possible options.
SPARK_DAEMON_MEMORY Memory to allocate to the Spark master and worker daemons themselves (default: 512m).
SPARK_DAEMON_JAVA_OPTS JVM options for the Spark master and worker daemons themselves in the form "-Dx=y" (default: none).
SPARK_PUBLIC_DNS The public DNS name of the Spark master and workers (default: none).

注意: 当前的启动脚本不支持Windows. 如果想在Windows运行 Spark集群,你需要手工启动master 和 worker.

SPARK_MASTER_OPTS 支持下面的系统属性:

属性名 默认值 意义
spark.deploy.retainedApplications 200 The maximum number of completed applications to display. Older applications will be dropped from the UI to maintain this limit.
spark.deploy.retainedDrivers 200 The maximum number of completed drivers to display. Older drivers will be dropped from the UI to maintain this limit.
spark.deploy.spreadOut true Whether the standalone cluster manager should spread applications out across nodes or try to consolidate them onto as few nodes as possible. Spreading out is usually better for data locality in HDFS, but consolidating is more efficient for compute-intensive workloads.
spark.deploy.defaultCores (infinite) Default number of cores to give to applications in Spark's standalone mode if they don't set spark.cores.max. If not set, applications always get all available cores unless they configure spark.cores.max themselves. Set this lower on a shared cluster to prevent users from grabbing the whole cluster by default.
spark.worker.timeout 60 Number of seconds after which the standalone deploy master considers a worker lost if it receives no heartbeats.

SPARK_WORKER_OPTS 支持下面的系统属性:

属性名 默认值 意义
spark.worker.cleanup.enabled false Enable periodic cleanup of worker / application directories. Note that this only affects standalone mode, as YARN works differently. Applications directories are cleaned up regardless of whether the application is still running.
spark.worker.cleanup.interval 1800 (30 minutes) Controls the interval, in seconds, at which the worker cleans up old application work dirs on the local machine.
spark.worker.cleanup.appDataTtl 7 24 3600 (7 days) The number of seconds to retain application work directories on each worker. This is a Time To Live and should depend on the amount of available disk space you have. Application logs and jars are downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space, especially if you run jobs very frequently.

连接应用到集群

为了在集群中运行应用, 只需简单传master的 spark://IP:PORT 给SparkContext的构造函数.

为了在集群中运行交互式的Spark shell, 运行下面的命令:

1
./bin/spark-shell --master spark://IP:PORT

你也可以传参数--total-executor-cores 来控制spark-shell使用的集群的核数.

启动编译的Spark应用

spark-submit脚本提供了最直接的方式来提交一个编译的应用到集群中。 对于standalone集群, Spark目前支持两种部署模式。在client模式中,该驱动程序在提交应用的客户端同一进程中启动。然而在集群模式下,驱动是从集群中的的一个worker进程中的启动,客户端只要它完成提交申请就退出, 无需等待应用程序完成。

如果你的应用程序是通过Spark submit启动,那么应用程序的jar会自动分发到所有工作节点。对于任何你的应用依赖的其它的jar,你应该使用逗号作为分隔符通过--jars标志指定它们(如--jars jar1,jar2)。为了控制应用程序的配置或者执行环境,请参照Spark配置

资源调度

独立集群模式目前只支持跨应用程序的简单FIFO调度。然而,为了允许多个并发用户,你可以控制每个应用使用的资源的最大数。默认情况下,它会请求使用集群的全部的核。 如果你同时只运行一个应用程序这才有意义。可以在SparkConf中设置spark.cores.max核心数。例如:

1
2
3
4
5
val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
.set("spark.cores.max", "10")
val sc = new SparkContext(conf)

另外, 你可以在集群的master进程中配置spark.deploy.defaultCores, 这会改变没有设置spark.cores.max应用程序使用的缺省值。 在conf/spark-env.sh增加:

1
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"

在共享集群的环境下当用户没有独自配置最大核数时这个参数很有用.

监控和日志

Spark standalone模式提供了一个web 接口,可以监控集群。 master和worker都有它们自己的web UI用来显示culster统计信息和job统计信息。 默认情况下master WEB UI使用8080端口, 这个端口可以在配置文件中修改,或者在命令行中传入.

另外, 每个job的详细日志也会写入到每个slave节点的work文件夹 (缺省为SPARK_HOME/work). 你会看到每个job有两个文件, stdout 和 stderr。 所有的输出写入到它们的控制台.

和Hadoop集成

你可以将Spark运行在Hadoop集群上, 只需在相同的机器上把它启动为一个服务。 如果想访问Hadoop的数据, 只需使用hdfs:// URL (典型地 hdfs://:9000/path, 你可以在你的Hadoop Namenode服务器的web UI上找到正确的地址). 或者, 你可以为Spark设置一个独立的集群, 仍然通过网络访问HDFS; 这会比本地磁盘访问要慢, 但是如果集群仍然在同一个局域网中这可能不会是一个问题 (比如你将Spark机器放在Haddop的机架上).

为网络安全配置端口

Spark非常引来网络, 有严格的防火墙设置需求。 它的网络端口列表可以参考 security page..

高可用

缺省情况下, standalone scheduling cluster容忍worker失败 (在这种情况下它可以将失败的任务转移给其它的worker)。 但是, 调度器使用master来做调度决定, 这会产生一个单点失败: 如果master崩溃,新的应用不会被创建。为了解决这个问题, 我们有两种高可用的解决方案,细节如下:

使用ZooKeeper建立master备用机

简介

ZooKeeper提供了leader选举和状态存储, 所以你可以在集群中启动多个Master,并连接到同一个ZooKeeper实例。 总有一个会被选为“leader” ,其它的作为standby. 如果当前的leader死了, 另一个master会被选举出来, 恢复老的master的状态, 继续调度. 整个恢复过程(从第一个leader宕机开始) 应该只花费一两分钟。 注意这只会延迟新的应用。 已经运行的应用不受影响.

可以查看 ZooKeeper文档.

配置

为了使用这种模式, 需要社会资 spark-env中的SPARK_DAEMON_JAVA_OPTS 参数, 参数值可以为以下的选项:

系统属性 意义
spark.deploy.recoveryMode Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE).
spark.deploy.zookeeper.url The ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181).
spark.deploy.zookeeper.dir The directory in ZooKeeper to store recovery state (default: /spark).

或许你已经知道: 如果你的集群有多个matser,但是未能正确master使用Zookeeper, master不能相互知道对方,所以它们认为自己都是leader。 这不会导致很严重的集群问题, 所有的master都独立调度。

细节

当你搭建好Zookeeper集群后, 使用高可用性非常简单。 简单的在不同的节点启动master进程,并使用Zookeeper配置 (ZooKeeper URL和目录)。 master可以随时增加或者移除。

为了调度一个新的应用或者增加新的worker到集群中,需要知道当前的leader的IP地址。 可以将master列表传入,就像以前只传一个master的信息。 例如, 你可以启动一个SparkContext,让它连接 spark://host1:port1,host2:port2. 这到让你的SparkContext尝试在这两个master上注册。 如果host1宕机,我们可以找到新的leader: host2.

"在一个master上注册"和正常操作有一个很重要的区别. 当启动时, 应用或者worker要能够发现lead master和在它上面注册。 纵然这是“in the system” (例如,存储在ZooKeeper). 当 failover发生时, 新的leader会联系先前注册的应用和worker,通知它们leader已改变,所以启动时它们甚至不需要知道新的master.

由于这个属性, 新的master可以随时加入。 你唯一需要担心的是新的应用和worker在新的master变成leader后能够发现它并在它上面注册。 一旦注册成功,就无需操劳了。

使用本地文件实现单节点恢复

简介

ZooKeeper 是最好的产品级的高可用方案, 但是如果你想在master宕机后重启它, 可以使用FILESYSTEM模式. 当应用和Worker注册时,它们将状态写入到提供的文件夹中。 当master进程重启时,可以根据这些状态恢复.

配置

为了使用这种恢复模式, 设置spark-env的SPARK_DAEMON_JAVA_OPTS 参数:

系统属性 意义
spark.deploy.recoveryMode Set to FILESYSTEM to enable single-node recovery mode (default: NONE).
spark.deploy.recoveryDirectory The directory in which Spark will store recovery state, accessible from the Master's perspective.

细节

  • 这个解决方案可以和进程监控/管理器合作, 比如monit, 或者通过重启手工恢复.
  • 尽管filesystem恢复模式看起来比直截了当没有做任何恢复要好,这种模式对某些开发或i这实现目的来讲可能不是最优的。特别是,通过stop-master.sh停止master不会清理其恢复状态,所以当你启动一个新的master,它会进入恢复模式。这可能会增加启动时间长达1分钟,如果它需要等待所有以前注册的worker/client超时的话。

  • 虽然NFS目录不是正式支持的特性,你可以挂载NFS目录作为恢复目录。如果原始的master完全死掉, 你可以启动一个不同的节点作为master,这将正确地恢复所有以前注册的worker/应用程序(相当于Zookeeper恢复)。后续的应用程序为了注册, 将不得不找到新的master。

翻译自 Spark Standalone Mode