Spark—RDD理解

Spark编程模型就是RDD(Resilient distributed dataset)弹性分布式数据集,他是MapReduce编程模型的拓展和延伸,但是他解决了MapReduce的痛点,也就是计算任务的结果如何进行高效的共享。

Spark使用内存计算的模式以及类似与Mapreduce的模型,使得大数据的并行计算得到了提升。而Spark基于RDD模型也同样实现了多类模型计算,如迭代计算、交互式sql、流式数据计算等。

抽象理解

RDD是 Spark 中的最基础的抽象,代表一个不可变、可分区、可以被并行计算的元素集合,被计算的 RDD全部的缓存在内存中。

关于RDD的详细信息,先从Spark源码中的注释开始说起。在源码中提到了五点属性:

1
2
3
4
5
6
7
8
9
10
/**
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
*/

分区列表

这也是 Spark 实现并行计算的基础,通过对 RDD 的 partition 使一个 RDD 的数据集合分布在整个 spark 任务集群中,通过调度器进行任务的调度。

一个RDD包含了多个分区,每个分区包含了RDD的部分数据,一个分区也叫做一个partition,这个partition可以在创建RDD时指定,也可以使用默认值。在spark的任务执行阶段每个partition会被当做一个Task处理,关于Task是什么,后续详解。

针对每一个 partition 都有一个函数进行计算

spark中关于rdd的计算是针对每一个partition的,每个rdd都会有一个compute函数进行计算,compute函数内部使用迭代器方式。

RDD 会对其他的 RDD 产生依赖

这其中包括了宽依赖和窄依赖,正是因为依赖关系,形成了 RDD的 lineage,这也是 RDD 容错的基础

可选,针对 RDD 的 [key,value]结构,可以选择指定针对 key 的分区策略

也就是指定分区器,在 Spark 中提供了几种不同的分区器(e.g. HashPartitioner)。

可选,计算每个partition的首选位置列表(例如,HDFS文件的块位置)

主要是为了提高数据本地性,减少数据块的网络传输。

总的来说RDDspark内部,只读的支持分区及容错(通过lineage),可以并行计算的基于内存数据结构。

实现理解

RDD 诞生于 SparkContext,之前介绍了 SparkContext 的环境初始化工作(e.g 初始化DAGScheduler、TaskScheduler等),这只是他两个主要工作中的一个,另一个就是负责创建 RDD。
在 SparkContext 的伴生类中,实现了所有创建 RDD 的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
.
.
.

进入 RDD 内部,RDD 作为一个抽象类,与其子类构成了我们熟知的模板方法模式,也就是说RDD 抽象类内部定义了基础方法
交由其子类实现,模板方法在抽象类内部实现,同时还包含了一些 hook 方法.这么做既保证了他的基础操作不会被随意更改,同时在面对新的RDD 结构添加进来时更方便拓展.

RDD算子

transformation(转换)

根据已经存在的RDD转换生成一个新的RDD,转换算子都是lazy操作,并不会马上执行,只有在遇到action算子时才会被真正的执行。

转换 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
coalesce(numPartitions) 减少 RDD 的分区数到指定值。
repartition(numPartitions) 重新给 RDD 分区
repartitionAndSortWithinPartitions(partitioner) 重新给 RDD 分区,并且每个分区内以记录的 key 排序

action(动作)

将rdd的计算的结果数据返回给Driver端,或者是保存结果数据到外部存储介质中,action算子会触发任务的执行。

动作 含义
reduce(func) reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeOrdered(n, [ordering]) 返回自然顺序或者自定义顺序的前 n 个元素
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path) 将数据集的元素,以 Java 序列化的方式保存到指定的目录下
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func
foreachPartition(func) 在数据集的每一个分区上,运行函数func

接下来我们以简单的transformation算子map操作为例详细分析:

1
2
3
val context = new SparkContext(conf)
val listRdd = Array(1, 2, 3, 5, 8, 6)
context.parallelize(listRdd).map(_ * 2).foreach(println(_))
  • 首先在context创建后,Spark 使用parallelize根据本地的集合创建了一个平行化的集合 RDD ,叫做 ParallelCollectionRDD 同样他要是继承自 RDD。

  • 接下来对创建好的 RDD 调用抽象类中的模板方法map(), 这样就构建出一个 RDD 子类 MapPartitionsRDD.

1
2
3
4
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
  • 接下来进入MapPartitionsRDD内部,可以看到他覆写了 RDD 中的几个基础方法,同样在其他的 RDD 的子类中也都是同样的方式进行了定制化.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions

// 这里主要介绍compute()这个方法,这是在执行计算阶段最重要的方法.是所有 Executor 执行的基础

override def compute(split: Partition, context: TaskContext): Iterator[U] =
//用于将(taskContext、partition index、iterator)的元组映射到输出迭代器的函数。
f(context, split.index, firstParent[T].iterator(split, context))

override def clearDependencies() {
}
@transient protected lazy override val isBarrier_ : Boolean =
override protected def getOutputDeterministicLevel = {
}

走到这里Transform 阶段已经完成,当然在实际使用过程中,肯定还包含了,其他复杂度转换操作,例如flatmap,filter,distinct等操作.不过当前任务并没有提交执行,在RDD 源码注释中可以看到这句话launch a job to return a value to the user program,也就是说在 Spark 中,所有的执行任务都是在调用了 Action 操作后才会开始执行的.以例子中的.foreach(println(_))为例.

  • foreach 方法也就是所谓的 Action Method,RDD 类中还包含了collect,subtract等一些方法.foreach()将我们传递进去的函数,应用在当前这个 RDD 中的所有元素上,并将这个操作作为一个 job 进行提交给DAGSchedulerDAGScheduler将起划分为一个一个的stage(这其中需要记住stage划分算法),再将每一个stage转化为TaskSet,交由TaskScheduler处理,之后由TaskScheduler的后台进程SchedulerBackend将一个个的task(task分配算法)提交给 Spark 分配的 Executor 进行执行任务!
1
2
3
4
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

总结

Spark正是以RDD为数据模型,完成了整个分布式的内存计算。对比来看Spark相较于Hadoop的优点一部分原因取决于RDD的使用。

  • 支持多种操作的算子,使得spark程序编写起来更加方便。
  • 基于内存,RDD之间进行转换不再需要进行落地。减少了大部分的io。
  • RDD的依赖关系及checkpoint的使用保证的spark计算过程中的容错。