4.2 file source
4.2.1 读取普通文件夹内的文件
package com.atguigu.ss
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Author lzc
* Date 2019/8/13 9:01 AM
*/
object ReadFromFile {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ReadFromFile")
.getOrCreate()
// 定义 Schema, 用于指定列名以及列中的数据类型
val userSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("job", StringType)
val user: DataFrame = spark.readStream
.format("csv")
.schema(userSchema)
.load("/Users/lzc/Desktop/csv") // 必须是目录, 不能是文件名
val query: StreamingQuery = user.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime(0)) // 触发器 数字表示毫秒值. 0 表示立即处理
.format("console")
.start()
query.awaitTermination()
}
}
注意:
前面获取user
的代码也可以使用下面的替换:
val user: DataFrame = spark.readStream
.schema(userSchema)
.csv("/Users/lzc/Desktop/csv")
4.2.2 读取自动分区的文件夹内的文件
当文件夹被命名为 "key=value" 形式时, Structured Streaming 会自动递归遍历当前文件夹下的所有子文件夹, 并根据文件名实现自动分区.
如果文件夹的命名规则不是"key=value"形式, 则不会触发自动分区. 另外, 同级目录下的文件夹的命名规则必须一致.
步骤 1: 创建如下目录结构
文件内容:
user1.csv
lisi,male,18
zhiling,female,28
user2.csv
lili,femal,19
fengjie,female,40
步骤 2: 创建如下代码
package com.atguigu.ss
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Author lzc
* Date 2019/8/13 9:01 AM
*/
object ReadFromFile2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ReadFromFile")
.getOrCreate()
// 定义 Schema, 用于指定列名以及列中的数据类型
val userSchema: StructType = new StructType().add("name", StringType).add("sex", StringType).add("age", IntegerType)
val user: DataFrame = spark.readStream
.schema(userSchema)
.csv("/Users/lzc/Desktop/csv")
val query: StreamingQuery = user.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime(0)) // 触发器 数字表示毫秒值. 0 表示立即处理
.format("console")
.start()
query.awaitTermination()
}
}