✅ Azure 流分析 ✅ 构造事件流
所有数据流事件都有一个与之关联的时间戳。 默认情况下,事件中心和 IoT 中心的事件根据事件中心或 IoT 中心接收事件的时间进行时间戳处理;Blob 存储中的事件由 Blob 的上次修改时间时间戳。 如果重新启动或重新运行作业,事件的时间戳不会更改。
许多流式处理应用程序需要使用事件发生的确切时间戳,而不是到达时间。 例如,在销售点应用程序中,可能需要与记录付款时间对应的事件时间戳,而不是支付事件到达事件引入服务的时间。 此外,异地分布式系统和网络延迟可能会导致不可预知的到达时间,从而使应用程序时间在流式处理应用程序中更加可靠。 对于这些情况,TIMESTAMP BY 子句允许指定自定义时间戳值。 该值可以是 DATETIME 类型的事件有效负载或表达式中的任何字段。 还支持符合 任何 ISO 8601 格式的字符串值。
请注意,使用自定义时间戳(TIMESTAMP BY 子句)可能会导致 Azure 流分析出于两个原因而按顺序引入事件,
- 单个事件生成者可能有不同的系统时钟(和倾斜)。
- 来自单个事件生成者的事件可能会在传输过程中延迟,例如,由生成者站点的网络不可用。
虽然事件生成者之间的混乱可能很大,但单个生成者的事件中的混乱通常很小,甚至不存在。 如果查询仅独立处理来自每个事件生成者的数据,则处理其时间线中的每个生成者的事件比管理生成者之间的时间偏差更有效。 Azure 流分析通过指定 over <over over spec> 子子句来支持子流,以启用在独立时间线中处理事件。 有关 OVER 子句对作业处理的影响,请参阅“OVER 子句与事件排序交互”。
语法
TIMESTAMP BY scalar_expression [OVER <over spec> ]
<over spec> ::=
{ column_name | expression } [,...n ]
注解
检索事件时间戳
可以使用 System.Timestamp() 属性在查询的任何部分的 SELECT 语句中检索事件时间戳。
OVER 子句与事件排序交互
使用 OVER 子句时,将修改 Azure 流分析的事件处理的几个方面:
在规范>的单个值元组<内应用最大无序容错。 也就是说,仅当事件与来自同一事件生成者的其他事件不一致时,事件才会被视为无序事件。
例如,如果来自同一事件生成者的事件始终排序,并且将导致立即处理,则可以使用值“0”。 另一方面,在此处使用大值将引入处理延迟,同时等待无序事件进行组装。
全局应用最大延迟到达容错(就好像未使用 OVER)。 也就是说,如果事件选择的时间戳(在 TIMESTAMP BY 子句中)离到达时间太远,则被视为延迟到达事件。
请注意,此处使用大值不会引入处理延迟,并且事件仍将立即处理(或根据最大无序容错)。 值数天不合理。 但是,使用异常长的值可能会影响处理作业所需的内存量。
每个事件生成者的输出事件在计算时生成,这意味着输出事件可能具有无序时间戳;但是,它们将按顺序排列在规范>的<单个值元组内。
局限性与限制
TIMESTAMP BY OVER 子句具有以下用法限制:
TIMESTAMP BY OVER 子句必须用于查询的所有输入,或者不用于其中任何输入。
TIMESTAMP BY OVER 子句仅支持完全并行作业或单个分区作业。
如果输入流具有多个分区,则 OVER 子句必须与 PARTITION BY 子句一起使用。 PartitionId 列必须指定为 TIMESTAMP BY OVER 列的一部分。
如果使用 TIMESTAMP BY OVER 子句,则子句中的列名必须在 GROUP BY 语句和所有 JOIN 谓词中用作在流之间联接时的分组键。
在 SELECT 语句或任何其他查询子句中创建的列不能用于 TIMESTAMP BY 子句中,必须使用输入有效负载中的字段。 例如, CROSS APPLY 的结果不能用作 TIMESTAMP BY 的目标值。 但是,可以使用一个执行 CROSS APPLY 的 Azure 流分析作业,并使用第二个作业来执行 TIMESTAMP BY。
System.Timestamp()不能用于 TIMESTAMP BY,因为 TIMESTAMP BY 是建立 System.Timestamp() 值的内容。
例子
示例 1 – 从有效负载访问时间戳字段
使用 EntryTime
有效负载中的字段作为事件时间戳
SELECT
EntryTime,
LicensePlate,
State
FROM input TIMESTAMP BY EntryTime
示例 2 - 将有效负载中的 UNIX 时间用作事件时间戳
UNIX 系统通常使用 POSIX(或纪元)时间,定义为自 1970 年 1 月 1 日星期四 1 日 00:00:00 协调世界时(UTC)以来经过的毫秒数。
此示例演示如何使用包含 Epoch 时间的数字“epochtime”字段作为事件时间戳。
SELECT
System.Timestamp(),
LicensePlate,
State
FROM input TIMESTAMP BY DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z')
示例 3 - 异类时间戳
假设处理包含两种类型的事件“A”和“B”的异类数据流。 事件“A”在字段“timestampA”中具有时间戳数据,事件“B”在字段“timestampB”中有时间戳。
此示例演示如何编写 TIMESTAMP BY,以便能够使用这两种类型的事件/时间戳。
SELECT
System.Timestamp(),
eventType,
eventValue,
FROM input TIMESTAMP BY
(CASE eventType
WHEN 'A' THEN timestampA
WHEN 'B' THEN timestampB
ELSE NULL END)
示例 4 – 处理分区查询中的多个时间线
处理来自不同发送方(收费站)的数据,而无需跨不同的收费站 ID 应用时间策略。 输入数据基于 TollId 进行分区。
SELECT
TollId,
COUNT(*) AS Count
FROM input
TIMESTAMP BY EntryTime OVER TollId, PartitionId
PARTITION BY PartitionId
GROUP BY TUMBLINGWINDOW(minute,3), TollId, PartitionId