1 Spark on Kubernetes

两个星期前(08/15/2017),spark社区提了一个新的SPIP(Spark Project Improvement Proposals): Spark on Kubernetes: Kubernetes as A Native Cluster Manager,即用k8s管理spark集群。经过社区2个星期的投票,看上去很快要能合入了。

喜大普奔啊!

我场的spark为了支持多租户也是跑在k8s上的,但其实是一个太监的方案:在k8s上跑的只是spark driver,而spark excutor是跑在yarn上的。但这个方案有2个局限:

  1. 由于yarn会将driver的listen socket信息带给excutor,让excutor反过来访问该listen socket,所以k8s集群和yarn集群必须跑在同一套服务器上,否则yarn上的container将无法访问k8s集群内部的容器。
  2. 由于k8s和yarn在同一套服务器上,即存在2个集群管理者,因此也必然存在资源竞争:究竟k8s是大房,还是yarn做小妾?在我们的集群上就出现过,yarn上跑的excutor负载过重,导致k8s上的容器出现故障的现象。当然可以在k8s和yarn之外再加一个管理者(齐人有一妻一妾),由它来统筹k8s和yarn,但终究是复杂了一点。

该SPIP合入Spark后,Spark将支持k8s作为集群管理器,可以解决上述问题:集群只有1个管理者,Spark与其他跑在k8s上的app并无二致,都是普通app而已,都要接受k8s的管理。

2 Goals

  • Make Kubernetes a first-class cluster manager for Spark, alongside Spark Standalone, Yarn, and Mesos.
  • Support both client and cluster deployment mode.
  • Support dynamic resource allocation.
  • Support Spark Java/Scala, PySpark, and Spark R applications.
  • Support secure HDFS access.
  • Allow running applications of different Spark versions in the same cluster through the ability to specify the driver and executor Docker images on a per-application basis.
  • Support specification and enforcement of limits on both CPU cores and memory.

基本上能够替换yarn了,而且能得到若干额外的好处,例如多版本Spark同时运行。特别是支持secure HDFS,好评。

3 设计思路

Spark的driver和excutor都运行在Pod中。driver会调用k8s的api来创建和销毁Excutor pod,而k8s则负责将Pod调度到合适的Node上执行。

3.1 cluster & client

spark on k8s支持cluster和client两种模式。

  • 在cluster模式下,driver运行在pod中,spark-submit脚本会启动一个k8s client,该client负责调用k8s api。由于driver也运行在pod中,所以集群中的excutor容器可以直接访问。
  • 在客户端模式下,driver在集群外部运行,并调用Kubernetes API来创建和销毁执行器Pods。driver必须可以从集群中路由,以便excutor容器与之进行通信。

driver中运行的主要组件是KubernetesClusterSchedulerBackend ,它是CoarseGrainedSchedulerBackend的一个实现,它通过调用方法doRequestTotalExecutors 和doKillExecutors 分别管理通过Kubernetes API分配和销毁执行程序。

3.2 反压

在KubernetesClusterSchedulerBackend内,单独的kubernetes-pod-allocator线程通过适当的调节和监视来处理新的Excutor Pods的创建。该线程会根据先前的excutor Pod创建请求是否已完成,决定是否k8s提交新的请求。

这种反压是必要的,因为Kubernetes API server乐观地接受对新Pods的请求,期望能够最终安排它们运行。

然而,k8s集群内有大量暂时无资源调度的Pods并不是个好事情。反压机制使我们能够控制应用程序扩展速度(可配置),并有助于防止Spark应用程序使用太多Pod创建请求,对Kubernetes API server造成DOS攻击

3.3 依赖文件存储

spark on k8s有辅助和可选组件:ResourceStagingServer 和KubernetesExternalShuffleService ,它们具体描述如下。

ResourceStagingServer 用作文件存储(如果没有Kubernetes持久化存储)。client会将application依赖上传到ResourceStagingServer,而driver和excutor pod中的 init-containers 会从ResourceStagingServer下载依赖。

ResourceStagingServer是一个JAX-RS的jetty server,分别具有用于上传和下载文件的两个端点。上传响应中返回安全令牌,并且必须在下载文件的请求中携带该令牌。

该ResourceStagingServer 部署为Kubernetes的service+deployment,并且可以在同一集群中部署多个实例。Spark应用程序通过配置属性指定要使用哪个ResourceStagingServer 实例。

3.4 动态资源分配

KubernetesExternalShuffleService 用于支持动态资源分配:Spark应用程序的Excutor的数量可以在运行时根据资源需求动态改变。

它为驱动程序提供了一个额外的端点,允许shuffle service删除driver程序,终止并清理与相应application相关联的随机文件。

有两种部署KubernetesExternalShuffleService的方法:或者使用DaemonSet在每个节点(或者一部分节点)中运行shuffle service,或者在每个执行器Pods中运行shuffle service。

在第一个选项中,每个 shuffle service 容器都装载一个hostPath卷。每个Excutor容器也安装相同的hostPath卷,该容器也必须将环境变量SPARK_LOCAL_DIRS 指向hostPath。

在第二个选项中, shuffle service 与每个Excutor Pod中的Excutor容器位于同一位置。两个容器共享一个emptyDir卷,用以写入shuffle数据。在集群中部署的 shuffle service 可能有多个,可能用于不同版本的Spark,或者用于资源配额不同的优先级别。

3.5 k8s配置项

Spark on k8s还引入了新的Kubernetes配置选项,以便于指定driver和excutor Pods的Kubernetes资源。例如,driver和excutor Pods可以在特定的Kubernetes命名空间中以及集群中特定的一组节点上创建。允许用户将label和注释应用于driver和excutor Pods。

下面例子里我指定了spark.kubernetes.driver.limit.coresspark.kubernetes.executor.limit.cores

另外,HDFS的安全性正在积极地进行设计。利用内置的Kubernetes Secrets,可以支持短期运行作业和长期运行作业。

4 跑下试试

试试就试试。

要运行Spark on k8s,需要先满足如下条件。

  • k8s集群。a piece of cake
  • 具备创建pods, configmaps, secrets的权限。a piece of cake
  • 支持k8s的spark distribution。可以从这里下载,也可以自己编译。当然你要是下载不了(aws s3你懂的),可以给我mail,一块钱一份。
  • 相关spark on k8s镜像:spark-driver/spark-excutor/spark-init
Component Image
Spark Driver Image kubespark/spark-driver:v2.2.0-kubernetes-0.3.0
Spark Executor Image kubespark/spark-executor:v2.2.0-kubernetes-0.3.0
Spark Initialization Image kubespark/spark-init:v2.2.0-kubernetes-0.3.0

向k8s提交一个Pi作业。--master指定了k8s的api server,可以通过kubectl clusterinfo查看Kubernetes master。jar文件是在容器里的,路径不要改。不过官方文档里写的是spark_examples_2.11-2.2.0.jar,这个名字是错的,按下面的命令即可。

bin/spark-submit \
  --deploy-mode cluster \
  --class org.apache.spark.examples.SparkPi  \
  --master k8s://http://127.0.0.1:8080  \
  --kubernetes-namespace default    \
  --conf spark.executor.instances=3    \
  --conf spark.app.name=spark-pi    \
  --conf spark.kubernetes.driver.limit.cores=1   \
  --conf spark.kubernetes.executor.limit.cores=1  \
  --conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:v2.2.0-kubernetes-0.3.0  \
  --conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:v2.2.0-kubernetes-0.3.0  \
  --conf spark.kubernetes.initcontainer.docker.image=kubespark/spark-init:v2.2.0-kubernetes-0.3.0  \
  local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.3.0.jar

不过这种离线作业,在执行完了以后只是删除了Excutor Pod,但并没有删除driver Pod,仍然还在running,并不友好。如果确实想设计为干完不走,driver pod可以用Job类型,但这个行为跟Yarn又不一致了(Spark on Yarn执行完了所有进程都退出),所以还是删除的好,只是这执行日志,哎呀,还是得找个好地方存起来呀。