5.3.2 append 模式下使用 wartermark

把前一个案例中的update改成append即可.


val query: StreamingQuery = wordCounts.writeStream
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(0))
    .format("console")
    .option("truncate", "false")
    .start

在 append 模式中, 仅输出新增的数据, 且输出后的数据无法变更.

测试:

  1. 输入数据:2019-08-14 10:55:00,dog

这个条数据作为第一批数据. 按照window($"timestamp", "10 minutes", "2 minutes")得到 5 个窗口. 由于此时初始 watermask=0, 当前批次中所有窗口的结束时间均大于 watermask.

但是 Structured Streaming 无法确定后续批次的数据中是否会更新当前批次的内容. 因此, 基于 Append 模式的特点, 这时并不会输出任何数据(因为输出后数据就无法更改了), 直到某个窗口的结束时间小于 watermask, 即可以确定后续数据不会再变更该窗口的聚合结果时才会将其输出, 并移除内存中对应窗口的聚合状态.

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

然后根据当前批次中最大的 event-time, 计算出来下次使用的 watermark. 本批次只有一个数据(10:55), 所有: watermark = 10:55 - 2min = 10:53

  1. 输入数据:2019-08-14 11:00:00,dog

这条数据作为第二批数据, 计算得到 5 个窗口. 此时的watermark=10:53, 所有的窗口的结束时间均大于 watermark, 仍然不会输出.

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

然后计算 watermark = 11:00 - 2min = 10:58

  1. 输入数据:2019-08-14 10:55:00,dog

相当于一条延迟数据.

这条数据作为第 3 批次, 计算得到 5 个窗口. 此时的 watermark = 10:58 当前内存中有两个窗口的结束时间已经低于 10: 58.

|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1    |
|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1    |

则意味着这两个窗口的数据不会再发生变化, 此时输出这个两个窗口的聚合结果, 并在内存中清除这两个窗口的状态.

所以这次输出结果:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1    |
|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1    |
+------------------------------------------+----+-----+

第三个批次的数据处理完成后, 立即计算: watermark= 10:55 - 2min = 10:53, 这个值小于当前的 watermask(10:58), 所以保持不变.(因为 watermask 只能增加不能减少)

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-09-26 14:46:14

results matching ""

    No results matching ""