5.4 RDD的 Action 操作
1. reduce(func)
通过func
函数聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
scala> val rdd1 = sc.parallelize(1 to 100)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd1.reduce(_ + _)
res0: Int = 5050
scala> val rdd2 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd2.reduce((x, y) => (x._1 + y._1, x._2 + y._2))
res2: (String, Int) = (abc,6)
2. collect
以数组的形式返回 RDD 中的所有元素.
所有的数据都会被拉到 driver 端, 所以要慎用
3. count()
返回 RDD 中元素的个数.
4. take(n)
返回 RDD 中前 n 个元素组成的数组.
take 的数据也会拉到 driver 端, 应该只对小数据集使用
scala> val rdd1 = sc.makeRDD(Array(10, 20, 30, 50, 60))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24
scala> rdd1.take(2)
res3: Array[Int] = Array(10, 20)
5. first
返回 RDD 中的第一个元素. 类似于take(1)
.
6. takeOrdered(n, [ordering])
返回排序后的前 n
个元素, 默认是升序排列.
数据也会拉到 driver 端
scala> val rdd1 = sc.makeRDD(Array(100, 20, 130, 500, 60))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at <console>:24
scala> rdd1.takeOrdered(2)
res6: Array[Int] = Array(20, 60)
scala> rdd1.takeOrdered(2)(Ordering.Int.reverse)
res7: Array[Int] = Array(500, 130)
7. aggregate
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
aggregate
函数将每个分区里面的元素通过seqOp
和初始值进行聚合,然后用combine
函数将每个分区的结果和初始值(zeroValue
)进行combine
操作。这个函数最终返回的类型不需要和RDD中元素类型一致
注意:
zeroValue
分区内聚合和分区间聚合的时候各会使用一次.
scala> val rdd1 = sc.makeRDD(Array(100, 30, 10, 30, 1, 50, 1, 60, 1), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:24
scala> rdd1.aggregate(0)(_ + _, _ + _)
res12: Int = 283
scala> val rdd1 = sc.makeRDD(Array("a", "b", "c", "d"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at makeRDD at <console>:24
scala> rdd1.aggregate("x")(_ + _, _ + _)
res13: String = xxabxcd
8. fold
折叠操作,aggregate
的简化操作,seqop
和combop
一样的时候,可以使用fold
scala> val rdd1 = sc.makeRDD(Array(100, 30, 10, 30, 1, 50, 1, 60, 1), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at makeRDD at <console>:24
scala> rdd1.fold(0)(_ + _)
res16: Int = 283
scala> val rdd1 = sc.makeRDD(Array("a", "b", "c", "d"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at makeRDD at <console>:24
scala> rdd1.fold("x")(_ + _)
res17: String = xxabxcd
9. saveAsTextFile(path)
作用:将数据集的元素以textfile
的形式保存到HDFS
文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用toString
方法,将它装换为文件中的文本
10. saveAsSequenceFile(path)
作用:将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统。
11. saveAsObjectFile(path)
作用:用于将 RDD 中的元素序列化成对象,存储到文件中。
12. countByKey()
作用:针对(K,V)
类型的 RDD,返回一个(K,Int)
的map
,表示每一个key
对应的元素个数。
应用: 可以用来查看数据是否倾斜
scala> val rdd1 = sc.parallelize(Array(("a", 10), ("a", 20), ("b", 100), ("c", 200)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at <console>:24
scala> rdd1.countByKey()
res19: scala.collection.Map[String,Long] = Map(b -> 1, a -> 2, c -> 1)
13. foreach(func)
作用: 针对 RDD 中的每个元素都执行一次func