第 4 章 操作Streaming DataFrame 和 Streaming DataSet
使用 Structured Streaming 最重要的就是对 Streaming DataFrame 和 Streaming DataSet 进行各种操作.
从 Spark2.0 开始, DataFrame 和 DataSet 可以表示静态有界的表, 也可以表示流式无界表.
与静态 Datasets/DataFrames 类似,我们可以使用公共入口点 SparkSession 从流数据源创建流式 Datasets/DataFrames,并对它们应用与静态 Datasets/DataFrames 相同的操作。
通过spark.readStream()
得到一个DataStreamReader
对象, 然后通过这个对象加载流式数据源, 就得到一个流式的 DataFrame
.
spark 内置了几个流式数据源, 基本可以满足我们的所有需求.
File source 读取文件夹中的文件作为流式数据. 支持的文件格式: text, csv, josn, orc, parquet. 注意, 文件必须放置的给定的目录中, 在大多数文件系统中, 可以通过移动操作来完成.
kafka source 从 kafka 读取数据. 目前兼容 kafka 0.10.0+ 版本
socket source 用于测试. 可以从 socket 连接中读取 UTF8 的文本数据. 侦听的 socket 位于驱动中. 注意, 这个数据源仅仅用于测试.
rate source 用于测试. 以每秒指定的行数生成数据,每个输出行包含一个 timestamp 和 value。其中 timestamp 是一个 Timestamp类型(信息产生的时间),并且 value 是 Long 包含消息的数量. 用于测试和基准测试.
Source | Options | Fault-tolerant | Notes |
---|---|---|---|
File source | path : path to the input directory, and common to all file formats. maxFilesPerTrigger : maximum number of new files to be considered in every trigger (default: no max) latestFirst : whether to process the latest new files first, useful when there is a large backlog of files (default: false) fileNameOnly : whether to check new files based on only the filename instead of on the full path (default: false). With this set to true , the following files would be considered as the same file, because their filenames, "dataset.txt", are the same: "file:///dataset.txt" "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" For file-format-specific options, see the related methods in DataStreamReader (Scala/Java/Python/R). E.g. for "parquet" format options see DataStreamReader.parquet() . In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for "parquet", see Parquet configuration section. |
Yes | Supports glob paths, but does not support multiple comma-separated paths/globs. |
Socket Source | host : host to connect to, must be specified port : port to connect to, must be specified |
No | |
Rate Source | rowsPerSecond (e.g. 100, default: 1): How many rows should be generated per second. rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond . Using finer granularities than seconds will be truncated to integer seconds. numPartitions (e.g. 10, default: Spark's default parallelism): The partition number for the generated rows. The source will try its best to reach rowsPerSecond , but the query may be resource constrained, and numPartitions can be tweaked to help reach the desired speed. |
Yes | |
Kafka Source | See the Kafka Integration Guide. | Yes |