Spark算子
1 Spark算子分类
- Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Value型的数据。
- Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。
- Action算子,这类算子会触发SparkContext提交作业。
类型 | 算子 |
---|---|
Value型Transformation算子 | map、flatMap、mapPartiions、groupBy、filter、distinct、sample、takesample、glom、union、cartesian、cache、persist、coalesce、repartition |
Key-Value型Transformation算子 | mapValues、combineByKey、reduceByKey、partitionBy、cogroup、join、leftOutJoin、rightOutJoin |
Actions算子 | foreach、saveAsTextFile、saveAsObjectFile、collect、collectAsMap、reduceByKeyLocally、lookup、count、top、reduce、fold、aggregate |
2 map和flatmap
参见Spark算子:RDD基本转换操作(1)–map、flatMap、distinct
- map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
- flatmap:第一步和map一样,最后将所有的输出分区合并成一个。
3 mapPartiions
参见Spark算子:RDD基本转换操作(5)–mapPartitions
该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,当在映射的过程中不断的创建对象时,使用mapPartitions比map的效率要高很多。
比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。
4 groupBy和groupByKey
- groupBy:将元素通过函数生成相应的Key,数据就转化为Key-Value格式,之后将Key相同的元素分为一组。
- groupByKey:对Key-Value形式的RDD的操作。与groupBy类似。但是其分组所用的key不是由指定的函数生成的,而是采用元素本身中的key。
5 union、intersection和subtract
参见Spark算子:RDD基本转换操作(4)–union、intersection、subtract
- union:就是将两个RDD进行合并,不去重。
- intersection:该函数返回两个RDD的交集,并且去重。
- subtract:返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。
6 sample和takeSample
参见Spark算子[15]:sample、takeSample 源码实例详解
- sample:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样。
- takeSample:takeSample()函数和sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行collect(),返回结果的集合为单机的数组。该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver的内存中。
7 cache和persist
参见Spark源码之persist方法,cache方法以及StorageLevel
cache和persist都是用于将一个RDD进行缓存,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。
- cache:cache方法实际上调用了无参数的persist方法
- persist:可以根据情况设置缓存级别,默认存储级别是MEMORY_ONLY
8 coalesce和repartition
参见Spark算子:RDD基本转换操作(2)–coalesce、repartition
- coalesce:
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
- 该函数用于将RDD进行重分区,使用HashPartitioner;
- 第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false。
-
repartition:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
- 该函数其实就是coalesce函数第二个参数为true的实现。
9 partitionBy、mapValues和flatMapValues
参见Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
-
partitionBy:该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
-
mapValues:同基本转换操作中的map,只不过mapValues是针对[K,V]中的V值进行map操作。
def mapValues[U](f: (V) => U): RDD[(K, U)]
-
flatMapValues:同基本转换操作中的flatMap,只不过flatMapValues是针对[K,V]中的V值进行flatMap操作。
def flatMapValues[U](f: (V) => TraversableOnce[U]): RDD[(K, U)]
10 combineByKey
参见Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
使用用户设置好的聚合函数对每个Key中的Value进行组合(combine)。可以将输入类型为RDD[(K, V)]转成成RDD[(K, C)]
11 reduce和reduceByKey
-
reduce:根据映射函数f,对RDD中的元素进行二元计算,聚集数据集中的所有元素,然后返回计算结果。这是一个Action算子。
def reduce(f: (T, T) ⇒ T): T // 实例 val c = sc.parallelize(1 to 10) c.reduce((x, y) => x + y) // 结果55
-
reduceByKey:该函数用于将RDD[K,V]中每个K对应的V值根据映射函数来运算。
def reduceByKey(func: (V, V) => V): RDD[(K, V)] // 实例 val a = sc.parallelize(List((1,2),(1,3),(3,4),(3,6))) a.reduceByKey((x,y) => x + y).collect // 结果 Array((1,5), (3,10))
reduceByKey对每个键执行reduce,结果生成RDD; 它不是”action”操作,而是返回ShuffleRDD,是”transformation”。在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。
12 take和top
-
take用于获取RDD中从0到num-1下标的元素,不排序。
def take(num: Int): Array[T]
-
top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
13 first、count和collect
- first:返回RDD中的第一个元素,不排序。
- count:返回RDD中的元素数量。
- collect:用于将一个RDD转换成数组。
14 countByKey、foreach、foreachPartition和sortBy
- countByKey:用于统计RDD[K,V]中每个K的数量。
- foreach:用于遍历RDD,将函数f应用于每一个元素。
- foreachPartition:和foreach类似,只不过是对每一个分区使用f。
- sortBy:根据给定的排序k函数将RDD中的元素进行排序。
15 aggregate
参见Spark算子:RDD行动Action操作(3)–aggregate、fold、lookup
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
aggregate:用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。