4.2 file source

4.2.1 读取普通文件夹内的文件

package com.atguigu.ss

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Author lzc
  * Date 2019/8/13 9:01 AM
  */
object ReadFromFile {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("ReadFromFile")
            .getOrCreate()

        // 定义 Schema, 用于指定列名以及列中的数据类型
        val userSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("job", StringType)
        val user: DataFrame = spark.readStream
            .format("csv")
            .schema(userSchema)
            .load("/Users/lzc/Desktop/csv")  // 必须是目录, 不能是文件名

        val query: StreamingQuery = user.writeStream
            .outputMode("append")
            .trigger(Trigger.ProcessingTime(0)) // 触发器 数字表示毫秒值. 0 表示立即处理
            .format("console")
            .start()
        query.awaitTermination()
    }
}

注意:

前面获取user的代码也可以使用下面的替换:

val user: DataFrame = spark.readStream
            .schema(userSchema)
            .csv("/Users/lzc/Desktop/csv")

4.2.2 读取自动分区的文件夹内的文件

当文件夹被命名为 "key=value" 形式时, Structured Streaming 会自动递归遍历当前文件夹下的所有子文件夹, 并根据文件名实现自动分区.

如果文件夹的命名规则不是"key=value"形式, 则不会触发自动分区. 另外, 同级目录下的文件夹的命名规则必须一致.

步骤 1: 创建如下目录结构

文件内容:

user1.csv

lisi,male,18
zhiling,female,28

user2.csv

lili,femal,19
fengjie,female,40

步骤 2: 创建如下代码

package com.atguigu.ss

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Author lzc
  * Date 2019/8/13 9:01 AM
  */
object ReadFromFile2 {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("ReadFromFile")
            .getOrCreate()

        // 定义 Schema, 用于指定列名以及列中的数据类型
        val userSchema: StructType = new StructType().add("name", StringType).add("sex", StringType).add("age", IntegerType)

        val user: DataFrame = spark.readStream
            .schema(userSchema)
            .csv("/Users/lzc/Desktop/csv")

        val query: StreamingQuery = user.writeStream
            .outputMode("append")
            .trigger(Trigger.ProcessingTime(0)) // 触发器 数字表示毫秒值. 0 表示立即处理
            .format("console")
            .start()
        query.awaitTermination()
    }
}

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-09-26 14:46:14

results matching ""

    No results matching ""