diff --git a/Flink_SQL/双流Join底层原理.md b/Flink_SQL/双流Join底层原理.md index 1f9f8bd..d1197b6 100644 --- a/Flink_SQL/双流Join底层原理.md +++ b/Flink_SQL/双流Join底层原理.md @@ -145,10 +145,181 @@ Full Join 会产生回撤流。 # Interval Join +Interval JOIN 相对于UnBounded的双流JOIN来说是Bounded JOIN。就是每条流的每一条数据会与另一条流上的不同时间区域 +的数据进行JOIN。 +## 语法 + +```sql +SELECT ... FROM t1 JOIN t2 ON t1.key = t2.key AND TIMEBOUND_EXPRESSION +``` + +TIMEBOUND_EXPRESSION 有两种写法,如下: +- L.time between LowerBound(R.time) and UpperBound(R.time) +- R.time between LowerBound(L.time) and UpperBound(L.time) +- 带有时间属性(L.time/R.time)的比较表达式。 + + +Interval JOIN 的语义就是每条数据对应一个时间区间的数据区间,比如有一个订单表Orders(orderId, productName, +orderTime)和付款表Payment(orderId, payType, payTime)。假设我们要统计在下单一小时内付款的订单信息。SQL查询如下: + + +```sql +SELECT + o.orderId, + o.productName, + p.payType, + o.orderTime, + cast(payTime as timestamp) as payTime +FROM + Orders AS o JOIN Payment AS p ON + o.orderId = p.orderId AND + p.payTime BETWEEN orderTime AND + orderTime + INTERVAL '1' HOUR +``` + +### Orders订单数据 + +| orderId | productName | orderTime | +|---|---|---| +| 001 | iphone | 2018-12-26 04:53:22.0 | +| 002 | mac | 2018-12-26 04:53:23.0 | +| 003 | book | 2018-12-26 04:53:24.0 | +| 004 | cup | 2018-12-26 04:53:38.0 | + +### Payment付款数据 + +| orderId | payType | payTime | +|---|---|---| +| 001 | alipay | 2018-12-26 05:51:41.0 | +| 002 | card | 2018-12-26 05:53:22.0 | +| 003 | card | 2018-12-26 05:53:30.0 | +| 004 | alipay | 2018-12-26 05:53:31.0 | + +符合语义的预期结果是 订单id为003的信息不出现在结果表中,因为下单时间`2018-12-26 04:53:24.0`, 付款时间是 +`2018-12-26 05:53:30.0`超过了1小时付款。 + +那么预期的结果信息如下: + +| orderId | productName | payType | orderTime | payTime +|---|---|---|---|----| +| 001 | iphone | alipay | 2018-12-26 04:53:22.0 | 2018-12-26 05:51:41.0 | +| 002 | mac | card | 2018-12-26 04:53:23.0 | 2018-12-26 05:53:22.0 | +| 004 | cup | alipay | 2018-12-26 04:53:38.0 | 2018-12-26 05:53:31.0 | + +这样Id为003的订单是无效订单,可以更新库存继续售卖。 + +接下来我们以图示的方式直观说明Interval JOIN的语义,我们对上面的示例需求稍微变化一下: 订单可以预付款(不管是 +否合理,我们只是为了说明语义)也就是订单 前后 1小时的付款都是有效的。SQL语句如下: + +```sql +SELECT + ... +FROM + Orders AS o JOIN Payment AS p ON + o.orderId = p.orderId AND + p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND + orderTime + INTERVAL '1' HOUR +``` + +## 总结 + +- Flink的流关联当前只能支持两条流的关联 +- Flink同时支持基于EventTime和ProcessingTime的流流join。 +- Interval join 已经支持inner ,left outer, right outer , full outer 等类型的join,由此来看官网对interval join +类型支持的说明不够准确。 +- 当前版本Interval join的两条流的消息清理是基于两条流共有的combinedWatermark(较小的流的watermark)。 +- 流的watermark不会用于将消息直接过滤掉,即时消息在本流中的watermark表示中已经迟到,但会直接将迟到的消息根据 +相应的join类型或输出或丢弃。 # 维表Join +维表(Dimension Table)是来自数仓建模的概念。在数仓模型中,事实表(Fact Table)是指存储有事实记录的表,如系统 +日志、销售记录等,而维表是与事实表相对应的一种表,它保存了事实表中指定属性的相关详细信息,可以跟事实表做关 +联;相当于将事实表上经常重复出现的属性抽取、规范出来用一张表进行管理。 +在实际生产中,我们经常会有这样的需求,以原始数据流作为基础,关联大量的外部表来补充一些属性。这种查询操作就是 +典型的维表 JOIN。 + +## 使用维表的好处 + +- 缩小了事实表的大小。 +- 便于维度的管理和维护,增加、删除和修改维度的属性,不必对事实表的大量记录进行改动。 +- 维度表可以为多个事实表重用,以减少重复工作。 + +## 维表JOIN使用 + +由于维表是一张不断变化的表(静态表视为动态表的一种特例),因此在维表 JOIN 时,需指明这条记录关联维表快照的对 +应时刻。Flink SQL 的维表 JOIN 语法引入了 Temporal Table 的标准语法,用于声明流数据关联的是维表哪个时刻的快照。 + +需要注意是,目前原生 Flink SQL 的维表 JOIN 仅支持事实表对当前时刻维表快照的关联(处理时间语义),而不支持事实 +表 rowtime 所对应的维表快照的关联(事件时间语义)。 + +### 语法说明 + +Flink SQL 中使用语法`for SYSTEM_TIME as of PROC_TIME()`来标识维表JOIN。仅支持`INNER JOIN`和`LEFT JOIN`。 + +```sql +SELECT + column-namesFROM + table1 [AS ][LEFT] +JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS ] +ON table1.column-name1 = table2.key-name1 +``` + +***注意:*** +`table1.proctime`表示`table1`的`proctime`字段。 + +### 使用示例 + +下面用一个简单的示例来展示维表 JOIN 语法。假设我们有一个 Orders 订单数据流,希望根据用户 ID 补全订单中的用户 +信息,因此需要跟 Customer 维度表进行关联。 + +```sql +CREATE TABLE Orders ( + id INT, + price DOUBLE, + quantity INT, + proc_time AS PROCTIME(), + PRIMARY KEY(id) NOT ENFORCED +) WITH ( + 'connector' = 'datagen', + 'fields.id.kind' = 'sequence', + 'rows-per-second' = '10' +); +CREATE TABLE Customers ( + id INT, + name STRING, + country STRING, + zip STRING, + PRIMARY KEY(id) NOT ENFORCED +) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', + 'table-name' = 'customers' +); +CREATE TABLE OrderDetails ( + id INT, + total_price DOUBLE, + country STRING, + zip STRING, + PRIMARY KEY(id) NOT ENFORCED +) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://mysqlhost:3306/orderdb', + 'table-name' = 'orderdetails' +); +-- enrich each order with customer information +INSERT INTO OrderDetails +SELECT + o.id, + o.price, + o.quantity, + c.country, + c.zipFROM +Orders AS o +JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c +ON o.id = c.id; +```