7.6 从 Mysql 数据读写文件

引入 Mysql 依赖:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

从 Mysql 读取数据

package day04

import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

object JDBCDemo {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
        val sc = new SparkContext(conf)
        //定义连接mysql的参数
        val driver = "com.mysql.jdbc.Driver"
        val url = "jdbc:mysql://hadoop201:3306/rdd"
        val userName = "root"
        val passWd = "aaa"

        val rdd = new JdbcRDD(
            sc,
            () => {
                Class.forName(driver)
                DriverManager.getConnection(url, userName, passWd)
            },
            "select id, name from user where id >= ? and id <= ?",
            1,
            20,
            2,
            result => (result.getInt(1), result.getString(2))
        )
        rdd.collect.foreach(println)

    }
}

向 Mysql 写入数据

package day04

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object JDBCDemo2 {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
        val sc = new SparkContext(conf)
        //定义连接mysql的参数
        val driver = "com.mysql.jdbc.Driver"
        val url = "jdbc:mysql://hadoop201:3306/rdd"
        val userName = "root"
        val passWd = "aaa"

        val rdd: RDD[(Int, String)] = sc.parallelize(Array((110, "police"), (119, "fire")))
        // 对每个分区执行 参数函数
        rdd.foreachPartition(it => {
            Class.forName(driver)
            val conn: Connection = DriverManager.getConnection(url, userName, passWd)
            it.foreach(x => {
                val statement: PreparedStatement = conn.prepareStatement("insert into user values(?, ?)")
                statement.setInt(1, x._1)
                statement.setString(2, x._2)
                statement.executeUpdate()
            })
        })


    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-09 00:21:43

results matching ""

    No results matching ""