11.3 JDBC

Spark SQL 也支持使用 JDBC 从其他的数据库中读取数据. JDBC 数据源比使用 JdbcRDD更爽一些. 这是因为返回的结果直接就是一个 DataFrame, DataFrame更加容易被处理或者与其他的数据源进行 join.

Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

注意: 如果想spark-shell操作 jdbc, 需要把相关的 jdbc 驱动 copy 到 jars 目录下.

导入依赖:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

从 jdbc 读数据

可以使用通用的load方法, 也可以使用jdbc方法

  1. 使用通用的load方法加载

    
     import org.apache.spark.sql.SparkSession
    
     object JDBCDemo {
         def main(args: Array[String]): Unit = {
             val spark: SparkSession = SparkSession
                 .builder()
                 .master("local[*]")
                 .appName("Test")
                 .getOrCreate()
             import spark.implicits._
    
             val jdbcDF = spark.read
                 .format("jdbc")
                 .option("url", "jdbc:mysql://hadoop201:3306/rdd")
                 .option("user", "root")
                 .option("password", "aaa")
                 .option("dbtable", "user")
                 .load()
             jdbcDF.show
         }
     }
    
  2. 使用 jdbc方法加载

    
     package day05
    
     import java.util.Properties
    
     import org.apache.spark.sql.{DataFrame, SparkSession}
    
     object JDBCDemo2 {
         def main(args: Array[String]): Unit = {
             val spark: SparkSession = SparkSession
                 .builder()
                 .master("local[*]")
                 .appName("Test")
                 .getOrCreate()
    
             val props: Properties = new Properties()
             props.setProperty("user", "root")
             props.setProperty("password", "aaa")
             val df: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop201:3306/rdd", "user", props)
             df.show
         }
     }
    

向 jdbc 写入数据

也分两种方法: 通用write.savewrite.jdbc

package day05

import java.util.Properties

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}

object JDBCDemo3 {


    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("Test")
            .getOrCreate()
        import spark.implicits._
        val rdd: RDD[User1] = spark.sparkContext.parallelize(Array(User1("lisi", 20), User1("zs", 30)))
        val ds: Dataset[User1] = rdd.toDS
        ds.write
            .format("jdbc")
            .option("url", "jdbc:mysql://hadoop201:3306/rdd")
            .option("user", "root")
            .option("password", "aaa")
            .option("dbtable", "user")
            .mode(SaveMode.Append)
            .save()
        val props: Properties = new Properties()
        props.setProperty("user", "root")
        props.setProperty("password", "aaa")
        ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop201:3306/rdd", "user", props)
    }
}


case class User1(name: String, age: Long)
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-09 00:21:43

results matching ""

    No results matching ""