Spark 架构概述

Spark 起源

Google 在 2003 年和 2004 年先后发表了 Google 文件系统 GFS 和 MapReduce 编程模型两篇文章,基于这两篇论文,2006 年 Hadoop 实现了 HDFS 和 MapReduce,Hadoop 成为了典型的大数据批量处理架构。由 HDFS 负责静态数据的存储,并通过 MapReduce 将计算逻辑分配到各数据节点进行数据计算。

而 Spark 则是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的分布式计算框架,用于大数据量下的迭代式计算,注 Spark 只是解决 Map-Reduce 计算慢和表达力低,而不是取代整个 Hadoop 生态,Spark 可以很好的兼容 Hive/Hdfs/HBase 这些存储组件。Spark 运算比 MapReduce 快的原因是因为 MapReduce 运算之后,会将数据的运算结果从内存写入到磁盘中,第二次 Mapredue 运算时在从磁盘中读取数据,所以其瓶颈在 2 次运算间的多余 IO 消耗。Spark 则是将数据一直缓存在内存中(shuffle 之前),直到计算得到最后的结果,再将结果写入到磁盘,所以多次运算的情况下,Spark 是比较快的。

具体区别如下,下面的章节将做详细的介绍:

  1. Map-Reduce 只提供了两种操作,表达力低不抽象。Spark 通过使用 RDD,提供了很多 transform 和 action。
  2. 一个 Job 只有 Map 和 Reduce 两个阶段,Job 之间的依赖关系需要开发者自行管理。而一个 Spark Job 可以包含多个 RDD 转换操作,只需要生成多个 Stage 即可,不同 Stage 之间的依赖关系 Spark 的 DAG 自动管理,开发者可以专注于业务逻辑的处理。
  3. Map-Reduce 对迭代数据处理性能差,每一次 Map 和 Reduce 的中间结果需要存放在 HDFS 磁盘中。而 Spark 同一个 Stage 中通过在内存缓存数据可以大大提高性能,内存不足时可以自动溢出到磁盘。
  4. Reduce 操作需要等待所有的 Map 操作完成后才能开始执行。Spark 分区相同的转换可以并行计算(窄依赖),只有分区不同(宽依赖)需要 shuffle 操作。

Spark 生态

  • Spark Core:包含了 Spark 的基础 API,比如对于 RDD 的操作 API,其他的 Spark 库也都是构建在 Spark Core 的基础上。
  • Spark Sql:包含了对于 Hive Sql 的操作,可以将 Hive Sql 转换成 Spark Rdd 操作。
  • Spark Streaming:提供了对于实时数据进行处理的方式。
  • MLib:包含了常用的机器学习算法实现,对于常见的分类和回归操作,可以对大量数据进分布式行迭代的操作。
  • GraphX:对图操作的工具集合。

p.s: 关于 Spark Sql/Spark Streaming/MLib 后续几篇文章在单独介绍,本文只先简单概述一下 Spark 的架构设计。

基础术语

在 Spark 中,被提交的程序叫做 Application,一个 Application 由多个 Job 组成,一个 Job 可以分解成多个 Stage,一个 Stage 又可以分解成多个 Task,然后将 Task 放在 Executor 进程上面运行。

  • Application:用户编写的 Spark 程序,可以理解为两部分,包含了一个 Driver Program 和集群中多个的 Executor。
  • Driver:Driver 可以理解为 Application 的 main 函数,会创建一个 SparkContext,SparkContext 负责与 Cluster Manager 通信,进行资源申请、任务的分配和监控等。
  • Cluster Manager:负责集群中资源的分配,目前主要有四种模式:
    • local:运行在本地非分布式部署。
    • Standalon:Spark 原生的资源管理。
    • Apache Mesos。
    • Yarn:Yarn 中的 ResourceManager,下文将对 Spark-On-Yarn 做详细介绍。
  • Worker:集群中可以运行 Application 代码的节点,在 Spark On Yarn 模式下可以理解为 NodeManager 节点。
  • Executor:运行在 Worker 上的一个进程,该进程可以负责多线程运行 task。
  • Job:一个 Application 由多个 Job 组成。
  • Stage:一个 Job 经过 DAG(DAGScheduler) 分解,可以拆分成多个 Stage。
  • Task:一个 Stage 可以拆解成多个 Task,然后放到 Executor 上面运行。

执行过程

  • 当一个 Spark 应用被提交时,首先需要为这个应用构建起基本的运行环境,即由任务控制节点(Driver)创建一个 SparkContext,由 SparkContext 负责和资源管理器(Cluster Manager)的通信以及进行资源的申请、任务的分配和监控等。SparkContext 会向资源管理器注册并申请运行 Executor 的资源。
  • 资源管理器为 Executor 分配资源,并启动 Executor 进程,Executor 运行情况将随着“心跳”发送到资源管理器上。
  • SparkContext 根据 RDD 的依赖关系构建 DAG 图,DAG 图提交给 DAG 调度器(DAGScheduler)进行解析,将 DAG 图分解成多个“阶段”(每个阶段都是一个任务集),并且计算出各个阶段之间的依赖关系,然后把一个个“任务集”提交给底层的任务调度器(TaskScheduler)进行处理;Executor 向 SparkContext 申请任务,任务调度器将任务分发给 Executor 运行,同时,SparkContext 将应用程序代码发放给 Executor。
  • 任务在 Executor 上运行,把执行结果反馈给任务调度器,然后反馈给 DAG 调度器,运行完毕后写入数据并释放所有资源。

总结

总结而言,Spark 运行架构具有以下特点:

  • 每个应用都有自己专属的 Executor 进程,Executor 进程以多线程的方式运行任务,减少了多进程任务频繁的启动开销,使得任务执行变得非常高效和可靠;
  • Spark 运行过程与资源管理器无关,只要能够获取 Executor 进程并保持通信即可;
  • Executor 上有一个 BlockManager 存储模块,类似于键值存储系统(把内存和磁盘共同作为存储设备),在处理迭代计算任务时,不需要把中间结果写入到HDFS等文件系统,而是直接放在这个存储系统上,后续有需要时就可以直接读取;在交互式查询场景下,也可以把表提前缓存到这个存储系统上,提高读写 IO 性能;
  • 任务采用了数据本地性和推测执行等优化机制。数据本地性是尽量将计算移到数据所在的节点上进行,即“计算向数据靠拢”,因为移动计算比移动数据所占的网络资源要少得多。而且,Spark 采用了延时调度机制,可以在更大的程度上实现执行过程优化。比如,拥有数据的节点当前正被其他的任务占用,那么,在这种情况下是否需要将数据移动到其他的空闲节点呢?答案是不一定。因为,如果经过预测发现当前节点结束当前任务的时间要比移动数据的时间还要少,那么,调度就会等待,直到当前节点可用。

Spark On Yarn

Spark On Yarn 模式根据 Driver 在集群中的位置分为两种模式:一种是 Yarn-Client 模式,另外一种是 Yarn-Cluster 模式。

Yarn-Client 模式

  • Spark Yarn Client 向 Yarn 的 ResourceManager 申请启动 Application Master。同时在 SparkContext 中创建 DAGScheduler 和 TASKScheduler 等。
  • ResourceManager 收到请求后,在集群中选择一个 NodeManager,为该应用程序第一个 Container,要求它在这个 Container 中启动应用程序的 ApplicationMaster。
  • Client 中的 SparkContext 初始完毕后,与 ApplicationMaster 建立通讯,向 ResourceManager 注册,根据任务信息向 ResourceManager 申请 Container。
  • 一旦 ApplicationMaster 申请到资源,也就是 Container 以后,便与对应的 NodeManger 通信,要求它在获得的 Container 中启动 CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend 启动后会向 Client 中的 SparkContext 注册并申请 Task。
  • Client 中的 SparkContext 分配 Task 给 CoarseGrainedExecutorBackend 执行,CoarseGrainedExecutorBackend 运行 Task 并向 Driver 汇报运行状态和监控。
  • 应用程序完成后,Client 的 SparkContext 向 ResourceManager 申请注销并关闭自己。

Yarn-Cluster 模式

在 Yarn-Cluster 模式中,当用户向 Yarn 提交一个应用程序后,Yarn 将分两个阶段运用该 Application:

  1. 把 Spark 的 Driver 作为一个 ApplicationMaster 在 Yarn 集群中先启动。
  2. 由 ApplicationMaster 创建应用程序,然后为它向 ResourceManager 申请资源,并启动 Executor 来运行 Task,同时监控它的整个运行过程,直到运行完成。

在 YARN 中,每个 Application 实例都有一个 ApplicationMaster 进程,它是 Application 启动的第一个容器。它负责和 ResourceManager 打交道并请求资源,获取资源之后告诉 NodeManager 为其启动 Container。从深层次的含义讲 YARN-Cluster 和 YARN-Client 模式的区别其实就是 ApplicationMaster 进程的区别。

YARN-Cluster 模式下,Driver 运行在 Application Master 中,它负责向 YARN 申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉 Client,作业会继续在 YARN 上运行,因而 YARN-Cluster 模式不适合运行交互类型的作业;YARN-Client 模式下,Application Master 仅仅向 YARN 请求 Executor,Client 会和请求的 Container 通信来调度他们工作,也就是说 Client 不能离开。

RDD

一个 RDD 就是一个只读分布式对象集合,一个 RDD 可以分成多个分区,每个分区就是一个数据集片段,不同的分区可以保存到不同的节点上。Spark RDD 可以分为两类:

  1. 转换操作(比如 map/filter/groupBy/join)接受 rdd 并返回 rdd。
  2. 行动操作(比如 count/collect)接受 rdd 但是不返回 rdd。Spark 中的 rdd 采用了惰性调用,真正的计算发生在行动操作,对于行动之前的转换操作,Spark 只是记录下转换操作的依赖关系。

宽依赖和窄依赖

  1. 窄依赖表示为,一个父 RDD 分区对应与一个子 RDD 分区,或者多个 RDD 分区对应一个子 RDD 分区。比如 map/filter/union。
  2. 宽依赖表示为,一个父 RDD 分区对应一个子 RDD 的多个分区。比如 groupByKey、sortByKey 等操作。

阶段划分

Spark 通过分析各个 RDD 的依赖关系生成了 DAG,再通过分析各个 RDD 中的分区之间的依赖关系来决定如何划分阶段,具体划分方法是:在 DAG 中进行反向解析,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到当前的阶段中;将窄依赖尽量划分在同一个阶段中,可以实现流水线计算。

Cache

Spark 中对于一个 RDD 执行多次算子的默认原理是这样的:每次对一个 RDD 执行一个算子操作时,都会重新从源头处计算一遍,计算出那个 RDD 来,然后再对这个 RDD 执行你的算子操作。这种方式的性能是很差的。
因此对于这种情况,需要对多次使用的 RDD 进行持久化。

Spark 本身就是一个基于内存的迭代式计算,所以如果程序从头到尾只有一个 Action 操作且子 RDD 只依赖于一个父RDD 的话,就不需要使用 cache 这个机制,RDD 会在内存中一直从头计算到尾,最后才根据你的 Action 操作返回一个值或者保存到相应的磁盘中。需要 cache 的是当存在多个 Action 操作或者依赖于多个 RDD 的时候,可以在那之前缓存RDD。

1
2
3
4
5
val rdd = sc.textFile("path/to/file").Map(...).filter(...)
val rdd1 = rdd.Map(x => x+1)
val rdd2 = rdd.Map(x => x+100)
val rdd3 = rdd1.join(rdd2)
rdd3.count()

上面的代码中会形成如下的 DAG 图:

所以可以在 rdd 生成之后使用 cache 函数对 rdd 进行缓存,这样就不用再从头开始计算了,缓存之后过程如下:

  除了 cache 函数外,缓存还可以使用 persist,cache 是使用的默认缓存选项一般默认为Memory_only(内存中缓存),persist 则可以在缓存的时候选择任意一种缓存类型。

checkpoint

基于 RDD 的依赖关系,如果任意一个 RDD 在相应的节点丢失,你只需要从上一步的 RDD 出发再次计算,便可恢复该 RDD。但是,如果一个 RDD 的依赖链比较长,而且中间又有多个 RDD 出现故障的话,进行恢复可能会非常耗费时间和计算资源。而检查点(Checkpoint)的引入,就是为了优化这些情况下的数据恢复。很多数据库系统都有检查点机制,在连续的 transaction 列表中记录某几个 transaction 后数据的内容,从而加快错误恢复。

RDD 中的检查点的思想与之类似。在计算过程中,对于一些计算过程比较耗时的 RDD,我们可以将它缓存至硬盘或 HDFS 中,标记这个 RDD 有被检查点处理过,并且清空它的所有依赖关系。同时,给它新建一个依赖于 CheckpointRDD 的依赖关系,CheckpointRDD 可以用来从硬盘中读取 RDD 和生成新的分区信息。这样,当某个子 RDD 需要错误恢复时,回溯至该 RDD,发现它被检查点记录过,就可以直接去硬盘中读取这个 RDD,而无需再向前回溯计算。存储级别(Storage Level)是一个枚举类型,用来记录 RDD 持久化时的存储级别,常用的有以下几个:

  1. MEMORY_ONLY:只缓存在内存中,如果内存空间不够则不缓存多出来的部分。这是 RDD 存储级别的默认值。

  2. MEMORY_AND_DISK:缓存在内存中,如果空间不够则缓存在硬盘中。

  3. DISK_ONLY:只缓存在硬盘中。

  4. MEMORY_ONLY_2 和 MEMORY_AND_DISK_2 等:与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。

常用操作

转换操作

map 是最基本的转换操作。与 MapReduce 中的 map 一样,它把一个 RDD 中的所有数据通过一个函数,映射成一个新的 RDD,任何原 RDD 中的元素在新 RDD 中都有且只有一个元素与之对应。

1
2
rdd = sc.parallelize(["b", "a", "c"])
rdd2 = rdd.map(lambda x: (x, 1)) // [('b', 1), ('a', 1), ('c', 1)]

filter 这个操作,是选择原 RDD 里所有数据中满足某个特定条件的数据,去返回一个新的 RDD。如下例所示,通过 filter,只选出了所有的偶数。

1
2
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.filter(lambda x: x % 2 == 0) // [2, 4]

mapPartitions 是 map 的变种。不同于 map 的输入函数是应用于 RDD 中每个元素,mapPartitions 的输入函数是应用于 RDD 的每个分区,也就是把每个分区中的内容作为整体来处理的,所以输入函数的类型是 Iterator[T] => Iterator[U]。

1
2
3
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator): yield sum(iterator)
rdd2 = rdd.mapPartitions(f) // [3, 7]

groupByKey 和 SQL 中的 groupBy 类似,是把对象的集合按某个 Key 来归类,返回的 RDD 中每个 Key 对应一个序列。

1
2
3
4
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
rdd.groupByKey().collect()
//"a" [1, 2]
//"b" [1]

动作操作

RDD 中的动作操作 collect 与函数式编程中的 collect 类似,它会以数组的形式,返回 RDD 的所有元素。需要注意的是,collect 操作只有在输出数组所含的数据数量较小时使用,因为所有的数据都会载入到程序的内存中,如果数据量较大,会占用大量 JVM 内存,导致内存溢出。

1
2
rdd = sc.parallelize(["b", "a", "c"])
rdd.map(lambda x: (x, 1)).collect() // [('b', 1), ('a', 1), ('c', 1)]

与 MapReduce 中的 reduce 类似,它会把 RDD 中的元素根据一个输入函数聚合起来。

1
2
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).reduce(add) // 15

Count 会返回 RDD 中元素的个数。

1
sc.parallelize([2, 3, 4]).count() // 3

CountByKey

仅适用于 Key-Value pair 类型的 RDD,返回具有每个 key 的计数的 的 map。

1
2
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.countByKey().items()) // [('a', 2), ('b', 1)]

Shuffle

上面提到,RDD 的 Transformation 函数中,又分为窄依赖和宽依赖,窄依赖跟宽依赖的区别是是否发生 shuffle,宽依赖会发生 shuffle 操作。窄依赖是子 RDD 的各 partition 不依赖于其他 patition,能够独立计算得到结果宽依赖指子 RDD 的各个分片会依赖于父RDD 的多个分片,所以会造成父 RDD 的各个分片在集群中重新分片。

1
2
3
4
5
// Map: "cat" -> c, cat
val rdd1 = rdd.Map(x => (x.charAt(0), x))
// groupby same key and count
val rdd2 = rdd1.groupBy(x => x._1).
Map(x => (x._1, x._2.toList.length))

第一个 Map 操作将 RDD 里的各个元素进行映射,RDD 的各个数据元素之间不存在依赖,可以在集群的各个内存中独立并行计算。第二个 groupby 之后的 Map 操作,为了计算相同 key 下的元素个数,需要把相同 key 的元素聚集到同一个 partition 下,所以造成了数据在内存中的重新分布即 shuffle 操作。

shuffle 操作是 spark 中最耗时的操作,应尽量避免不必要的 shuffle。宽依赖主要有两个过程: shuffle write 和 shuffle fetch。类似 Hadoop 的 Map 和 Reduce 阶段,shuffle write 将 ShuffleMapTask 任务产生的中间结果缓存到内存中,shuffle fetch 获得 ShuffleMapTask 缓存的中间结果进行 ShuffleReduceTask 计算,这个过程容易造成OutOfMemory。

shuffle 过程内存分配使用 ShuffleMemoryManager 类管理,会针对每个 Task 分配内存,Task 任务完成后通过 Executor 释放空间,可以把 Task 理解成不同 key 的数据对应一个 Task。早期的内存分配机制使用公平分配,即不同 Task 分配的内存是一样的,但是这样容易造成内存需求过多的 Task 的 OutOfMemory,从而造成多余的 磁盘 IO 过程,影响整体的效率。比如某一个 key 下的数据明显偏多,但因为大家内存都一样,这一个 key 的数据就容易 OutOfMemory。1.5 版以后 Task 共用一个内存池,内存池的大小默认为 JVM 最大运行时内存容量的16%,分配机制如下:

假如有 N 个 Task,ShuffleMemoryManager 保证每个 Task 溢出之前至少可以申请到1/2N 内存,且至多申请到1/N,N 为当前活动的 shuffle Task 数。因为N 是一直变化的,所以 manager 会一直追踪 Task 数的变化,重新计算队列中的1/N 和1/2N。但是这样仍然容易造成内存需要多的 Task 任务溢出。

运行到每个 stage 的边界时,数据在父 stage 中按照 Task 写到磁盘上,而在子 stage 中通过网络按照 Task 去读取数据。这些操作会导致很重的网络以及磁盘的I/O,所以 stage 的边界是非常占资源的,在编写 Spark 程序的时候需要尽量避免的 。父 stage 中 partition 个数与子 stage 的 partition 个数可能不同,所以那些产生 stage 边界的 Transformation 常常需要接受一个 numPartition 的参数来觉得子 stage 中的数据将被切分为多少个 partition7。 shuffle 操作的时候可以用 combiner 压缩数据,减少 IO 的消耗。

参数调优

下面的这些参数主要在 spark-submit 的时候指定,可以通过 spark-submit –help 查看。

  1. num-Executors: 该参数用于设置 Spark 作业总共要用多少个 Executor 进程来执行。这个参数非常重要,如果不设置的话,默认只会给你启动少量的 Executor 进程,此时你的 Spark 作业的运行速度是非常慢的。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

  2. Executor-memory: 该参数用于设置每个 Executor 进程的内存。Executor 内存的大小,很多时候直接决定了 Spark 作业的性能,而且跟常见的 JVM OOM 异常,也有直接的关联。

  3. Executor-cores: 用于设置每个 Executor 进程的 CPU core 数量。这个参数决定了每个 Executor 并行执行 Task 线程的能力。每个 core 同一时间只能执行一个 Task 线程,因此每个 Executor 的 core 越多,越能够快速地执行完分配给自己的所有 Task 线程。

  4. driver-memory: 该参数用于设置 Driver 进程的内存。如果需要使用 collect 算子将 RDD 的数据全部拉取到 Driver 上进行处理,那么必须确保 Driver 的内存足够大,否则会出现 OOM 内存溢出的问题。

  5. spark.default.parallelism: 该参数用于设置每个 stage 的默认 Task 数量。这个参数极为重要,如果不设置可能会直接影响你的 Spark 作业性能。如果不去设置这个参数,那么就会导致 Spark 自己根据底层 HDFS 的 block 数量来设置 Task 的数量,默认是一个 HDFS block 对应一个 Task。通常来说,Spark 默认设置的数量是偏少的(比如几十个 Task),如果 Task 数量偏少的话,就会导致你前面设置好的 Executor 的参数都前功尽弃。即无论你的 Executor 进程/内存/CPU有多大,但是 Task 只有几个,那么 90% 的 Executor 进程可能根本就没有 Task 执行,也就白白浪费了资源此 Spark 官网建议的设置原则是,设置该参数为 num-Executors * Executor-cores 的 2~3 倍较为合适。

  6. spark.storage.memoryFrAction: 该参数用于设置 RDD 持久化数据在 Executor 内存中能占的比例,默认是 0.6。也就是说,默认 Executor 60% 的内存,可以用来保存持久化的 RDD 数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。如果 Spark 作业中,有较多的 RDD 持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果 Spark 作业中的 shuffle 类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。如果发现作业由于频繁的gc导致运行缓慢(通过 Spark web ui 可以观察到作业的 gc 耗时),意味着 Task 执行用户代码的内存不够用,那么同样建议调低这个参数的值。

  7. spark.shuffle.memoryFrAction: 该参数用于设置 shuffle 过程中一个 Task 拉取到上个 stage 的 Task 的输出后,进行聚合操作时能够使用的 Executor 内存的比例,默认 20%。shuffle 操作在进行聚合时,如果使用的内存超出 20% 的限制,多余的数据就会溢写到磁盘,此时会极大地降低性能。如果 Spark 作业中的 RDD 持久化操作较少,shuffle 操作较多时,建议降低持久化操作的内存占比,提高 shuffle 操作的内存占比比例,避免 shuffle 过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的 gc 导致运行缓慢,意味着 Task 执行用户代码的内存不够用,那么同样建议调低这个参数的值。