7.6 从 Mysql 数据读写文件
引入 Mysql 依赖:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
从 Mysql 读取数据
package day04
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
object JDBCDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
val sc = new SparkContext(conf)
//定义连接mysql的参数
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop201:3306/rdd"
val userName = "root"
val passWd = "aaa"
val rdd = new JdbcRDD(
sc,
() => {
Class.forName(driver)
DriverManager.getConnection(url, userName, passWd)
},
"select id, name from user where id >= ? and id <= ?",
1,
20,
2,
result => (result.getInt(1), result.getString(2))
)
rdd.collect.foreach(println)
}
}
向 Mysql 写入数据
package day04
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object JDBCDemo2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
val sc = new SparkContext(conf)
//定义连接mysql的参数
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop201:3306/rdd"
val userName = "root"
val passWd = "aaa"
val rdd: RDD[(Int, String)] = sc.parallelize(Array((110, "police"), (119, "fire")))
// 对每个分区执行 参数函数
rdd.foreachPartition(it => {
Class.forName(driver)
val conn: Connection = DriverManager.getConnection(url, userName, passWd)
it.foreach(x => {
val statement: PreparedStatement = conn.prepareStatement("insert into user values(?, ?)")
statement.setInt(1, x._1)
statement.setString(2, x._2)
statement.executeUpdate()
})
})
}
}