11.3 JDBC
Spark SQL 也支持使用 JDBC 从其他的数据库中读取数据. JDBC 数据源比使用 JdbcRDD
更爽一些. 这是因为返回的结果直接就是一个 DataFrame
, DataFrame
更加容易被处理或者与其他的数据源进行 join
.
Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame
,通过对DataFrame
一系列的计算后,还可以将数据再写回关系型数据库中。
注意: 如果想spark-shell
操作 jdbc, 需要把相关的 jdbc 驱动 copy 到 jars 目录下
导入依赖:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
从 jdbc 读数据
可以使用通用的load
方法, 也可以使用jdbc
方法
使用通用的
load
方法加载import org.apache.spark.sql.SparkSession object JDBCDemo { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("Test") .getOrCreate() import spark.implicits._ val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:mysql://hadoop201:3306/rdd") .option("user", "root") .option("password", "aaa") .option("dbtable", "user") .load() jdbcDF.show } }
使用
jdbc
方法加载package day05 import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} object JDBCDemo2 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("Test") .getOrCreate() val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "aaa") val df: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop201:3306/rdd", "user", props) df.show } }
向 jdbc 写入数据
也分两种方法: 通用write.save
和write.jdbc
package day05
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
object JDBCDemo3 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("Test")
.getOrCreate()
import spark.implicits._
val rdd: RDD[User1] = spark.sparkContext.parallelize(Array(User1("lisi", 20), User1("zs", 30)))
val ds: Dataset[User1] = rdd.toDS
ds.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop201:3306/rdd")
.option("user", "root")
.option("password", "aaa")
.option("dbtable", "user")
.mode(SaveMode.Append)
.save()
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "aaa")
ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop201:3306/rdd", "user", props)
}
}
case class User1(name: String, age: Long)