5.3.2 append 模式下使用 wartermark
把前一个案例中的update
改成append
即可.
val query: StreamingQuery = wordCounts.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime(0))
.format("console")
.option("truncate", "false")
.start
在 append 模式中, 仅输出新增的数据, 且输出后的数据无法变更.
测试:
- 输入数据:
2019-08-14 10:55:00,dog
这个条数据作为第一批数据. 按照window($"timestamp", "10 minutes", "2 minutes")
得到 5 个窗口. 由于此时初始 watermask=0, 当前批次中所有窗口的结束时间均大于 watermask.
但是 Structured Streaming 无法确定后续批次的数据中是否会更新当前批次的内容. 因此, 基于 Append 模式的特点, 这时并不会输出任何数据(因为输出后数据就无法更改了), 直到某个窗口的结束时间小于 watermask, 即可以确定后续数据不会再变更该窗口的聚合结果时才会将其输出, 并移除内存中对应窗口的聚合状态.
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+
然后根据当前批次中最大的 event-time, 计算出来下次使用的 watermark. 本批次只有一个数据(10:55), 所有: watermark = 10:55 - 2min = 10:53
- 输入数据:
2019-08-14 11:00:00,dog
这条数据作为第二批数据, 计算得到 5 个窗口. 此时的watermark=10:53, 所有的窗口的结束时间均大于 watermark, 仍然不会输出.
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+
然后计算 watermark = 11:00 - 2min = 10:58
- 输入数据:
2019-08-14 10:55:00,dog
相当于一条延迟数据.
这条数据作为第 3 批次, 计算得到 5 个窗口. 此时的 watermark = 10:58 当前内存中有两个窗口的结束时间已经低于 10: 58.
|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1 |
|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1 |
则意味着这两个窗口的数据不会再发生变化, 此时输出这个两个窗口的聚合结果, 并在内存中清除这两个窗口的状态.
所以这次输出结果:
+------------------------------------------+----+-----+
|window |word|count|
+------------------------------------------+----+-----+
|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1 |
|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1 |
+------------------------------------------+----+-----+
第三个批次的数据处理完成后, 立即计算: watermark= 10:55 - 2min = 10:53, 这个值小于当前的 watermask(10:58), 所以保持不变.(因为 watermask 只能增加不能减少)