10.2.3 RDD
和 DataFrame
的交互
1. 从 RDD
到 DataFrame
涉及到RDD
, DataFrame
, DataSet
之间的操作时, 需要导入:import spark.implicits._
这里的spark
不是包名, 而是表示SparkSession
的那个对象. 所以必须先创建SparkSession
对象再导入. implicits
是一个内部object
首先创建一个RDD
scala> val rdd1 = sc.textFile("/opt/module/spark-local/examples/src/main/resources/people.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/examples/src/main/resources/people.txt MapPartitionsRDD[10] at textFile at <console>:24
手动转换
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); (paras(0), paras(1).toInt)})
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:26
// 转换为 DataFrame 的时候手动指定每个数据字段名
scala> rdd2.toDF("name", "age").show
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
通过样例类反射转换(最常用)
创建样例类
scala> case class People(name :String, age: Int) defined class People
使用样例把
RDD
转换成DataFrame
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); People(paras(0), paras(1).toInt) }) rdd2: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[6] at map at <console>:28 scala> rdd2.toDF.show +-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
通过 API 的方式转换(了解)
package day05
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object DataFrameDemo2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("Word Count")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val rdd: RDD[(String, Int)] = sc.parallelize(Array(("lisi", 10), ("zs", 20), ("zhiling", 40)))
// 映射出来一个 RDD[Row], 因为 DataFrame其实就是 DataSet[Row]
val rowRdd: RDD[Row] = rdd.map(x => Row(x._1, x._2))
// 创建 StructType 类型
val types = StructType(Array(StructField("name", StringType), StructField("age", IntegerType)))
val df: DataFrame = spark.createDataFrame(rowRdd, types)
df.show
}
}
2. 从 DataFrame
到RDD
直接调用DataFrame
的rdd
方法就完成了从转换.
scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at rdd at <console>:25
scala> rdd.collect
res0: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
说明:
- 得到的
RDD
中存储的数据类型是:Row
.