5.3.3 Key-Value 类型---2

7. combineByKey[C]

函数声明:

def combineByKey[C](
                       createCombiner: V => C,
                       mergeValue: (C, V) => C,
                       mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
        partitioner, mapSideCombine, serializer)(null)
}
  1. 作用: 针对每个K, 将V进行合并成C, 得到RDD[(K,C)]

  2. 参数描述:

    • createCombiner: combineByKey会遍历分区中的每个key-value对. 如果第一次碰到这个key, 则调用createCombiner函数,传入value, 得到一个C类型的值.(如果不是第一次碰到这个 key, 则不会调用这个方法)

    • mergeValue: 如果不是第一个遇到这个key, 则调用这个函数进行合并操作. 分区内合并

    • mergeCombiners 跨分区合并相同的key的值(C). 跨分区合并

  3. workcount

案例

需求1: 创建一个 pairRDD,根据 key 计算每种 keyvalue的平均值。(先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)

需求分析: ppt 分析

scala> val input = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:24
// acc 累加器, 用来记录分区内的值的和这个 key 出现的次数
// acc1, acc2 跨分区的累加器
scala> input.combineByKey((_, 1), (acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1:(Int, Int), acc2: (Int, Int))=> (acc1._1 + acc2._1, acc1._2 + acc2._2))
res10: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[7] at combineByKey at <console>:27

scala> res10.collect
res11: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3)))

scala> res10.map(t => (t._1, t._2._1.toDouble / t._2._2)).collect
res12: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333))

对比几个按照 key 聚集的函数的区别和联系


8. sortByKey

作用: 在一个(K,V)的 RDD 上调用, K必须实现 Ordered[K] 接口(或者有一个隐式值: Ordering[K]), 返回一个按照key进行排序的(K,V)的 RDD

scala> val rdd = sc.parallelize(Array((1, "a"), (10, "b"), (11, "c"), (4, "d"), (20, "d"), (10, "e")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[11] at parallelize at <console>:24

scala> rdd.sortByKey()
res25: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[14] at sortByKey at <console>:27

scala> res25.collect
res26: Array[(Int, String)] = Array((1,a), (4,d), (10,b), (10,e), (11,c), (20,d))

scala> rdd.sortByKey(true).collect
res27: Array[(Int, String)] = Array((1,a), (4,d), (10,b), (10,e), (11,c), (20,d))

// 倒序
scala> rdd.sortByKey(false).collect
res28: Array[(Int, String)] = Array((20,d), (11,c), (10,b), (10,e), (4,d), (1,a))

9. mapValues

作用: 针对(K,V)形式的类型只对V进行操作

scala> val rdd = sc.parallelize(Array((1, "a"), (10, "b"), (11, "c"), (4, "d"), (20, "d"), (10, "e")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[21] at parallelize at <console>:24

scala> rdd.mapValues("<" + _ + ">").collect
res29: Array[(Int, String)] = Array((1,<a>), (10,<b>), (11,<c>), (4,<d>), (20,<d>), (10,<e>))

10. join(otherDataset, [numTasks])

内连接:

在类型为(K,V)(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的RDD

scala> var rdd1 = sc.parallelize(Array((1, "a"), (1, "b"), (2, "c")))
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> var rdd2 = sc.parallelize(Array((1, "aa"), (3, "bb"), (2, "cc")))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> rdd1.join(rdd2).collect
res2: Array[(Int, (String, String))] = Array((2,(c,cc)), (1,(a,aa)), (1,(b,aa)))

注意:

  • 如果某一个 RDD 有重复的 Key, 则会分别与另外一个 RDD 的相同的 Key进行组合.

  • 也支持外连接: leftOuterJoin, rightOuterJoin, and fullOuterJoin.

11. cogroup(otherDataset, [numTasks])

作用:在类型为(K,V)(K,W)的 RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的 RDD

scala> val rdd1 = sc.parallelize(Array((1, 10),(2, 20),(1, 100),(3, 30)),1)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[23] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(Array((1, "a"),(2, "b"),(1, "aa"),(3, "c")),1)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> rdd1.cogroup(rdd2).collect
res9: Array[(Int, (Iterable[Int], Iterable[String]))] = Array((1,(CompactBuffer(10, 100),CompactBuffer(a, aa))), (3,(CompactBuffer(30),CompactBuffer(c))), (2,(CompactBuffer(20),CompactBuffer(b))))
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-09 00:21:43

results matching ""

    No results matching ""