Flink 双流Join基本原理-维表Join
This commit is contained in:
parent
34c369c3cf
commit
86ce9fef9f
@ -145,10 +145,195 @@ Full Join 会产生回撤流。
|
||||
|
||||
# Interval Join
|
||||
|
||||
Interval JOIN 相对于UnBounded的双流JOIN来说是Bounded JOIN。就是每条流的每一条数据会与另一条流上的不同时间区域
|
||||
的数据进行JOIN。
|
||||
|
||||
## 语法
|
||||
|
||||
```sql
|
||||
SELECT ... FROM t1 JOIN t2 ON t1.key = t2.key AND TIMEBOUND_EXPRESSION
|
||||
```
|
||||
|
||||
TIMEBOUND_EXPRESSION 有两种写法,如下:
|
||||
- L.time between LowerBound(R.time) and UpperBound(R.time)
|
||||
- R.time between LowerBound(L.time) and UpperBound(L.time)
|
||||
- 带有时间属性(L.time/R.time)的比较表达式。
|
||||
|
||||
|
||||
Interval JOIN 的语义就是每条数据对应一个时间区间的数据区间,比如有一个订单表Orders(orderId, productName,
|
||||
orderTime)和付款表Payment(orderId, payType, payTime)。假设我们要统计在下单一小时内付款的订单信息。SQL查询如下:
|
||||
|
||||
|
||||
```sql
|
||||
SELECT
|
||||
o.orderId,
|
||||
o.productName,
|
||||
p.payType,
|
||||
o.orderTime,
|
||||
cast(payTime as timestamp) as payTime
|
||||
FROM
|
||||
Orders AS o JOIN Payment AS p ON
|
||||
o.orderId = p.orderId AND
|
||||
p.payTime BETWEEN orderTime AND
|
||||
orderTime + INTERVAL '1' HOUR
|
||||
```
|
||||
|
||||
### Orders订单数据
|
||||
|
||||
| orderId | productName | orderTime |
|
||||
|---|---|---|
|
||||
| 001 | iphone | 2018-12-26 04:53:22.0 |
|
||||
| 002 | mac | 2018-12-26 04:53:23.0 |
|
||||
| 003 | book | 2018-12-26 04:53:24.0 |
|
||||
| 004 | cup | 2018-12-26 04:53:38.0 |
|
||||
|
||||
### Payment付款数据
|
||||
|
||||
| orderId | payType | payTime |
|
||||
|---|---|---|
|
||||
| 001 | alipay | 2018-12-26 05:51:41.0 |
|
||||
| 002 | card | 2018-12-26 05:53:22.0 |
|
||||
| 003 | card | 2018-12-26 05:53:30.0 |
|
||||
| 004 | alipay | 2018-12-26 05:53:31.0 |
|
||||
|
||||
符合语义的预期结果是 订单id为003的信息不出现在结果表中,因为下单时间`2018-12-26 04:53:24.0`, 付款时间是
|
||||
`2018-12-26 05:53:30.0`超过了1小时付款。
|
||||
|
||||
那么预期的结果信息如下:
|
||||
|
||||
| orderId | productName | payType | orderTime | payTime
|
||||
|---|---|---|---|----|
|
||||
| 001 | iphone | alipay | 2018-12-26 04:53:22.0 | 2018-12-26 05:51:41.0 |
|
||||
| 002 | mac | card | 2018-12-26 04:53:23.0 | 2018-12-26 05:53:22.0 |
|
||||
| 004 | cup | alipay | 2018-12-26 04:53:38.0 | 2018-12-26 05:53:31.0 |
|
||||
|
||||
这样Id为003的订单是无效订单,可以更新库存继续售卖。
|
||||
|
||||
接下来我们以图示的方式直观说明Interval JOIN的语义,我们对上面的示例需求稍微变化一下: 订单可以预付款(不管是
|
||||
否合理,我们只是为了说明语义)也就是订单 前后 1小时的付款都是有效的。SQL语句如下:
|
||||
|
||||
```sql
|
||||
SELECT
|
||||
...
|
||||
FROM
|
||||
Orders AS o JOIN Payment AS p ON
|
||||
o.orderId = p.orderId AND
|
||||
p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND
|
||||
orderTime + INTERVAL '1' HOUR
|
||||
```
|
||||
|
||||
## 总结
|
||||
|
||||
- Flink的流关联当前只能支持两条流的关联
|
||||
- Flink同时支持基于EventTime和ProcessingTime的流流join。
|
||||
- Interval join 已经支持inner ,left outer, right outer , full outer 等类型的join,由此来看官网对interval join
|
||||
类型支持的说明不够准确。
|
||||
- 当前版本Interval join的两条流的消息清理是基于两条流共有的combinedWatermark(较小的流的watermark)。
|
||||
- 流的watermark不会用于将消息直接过滤掉,即时消息在本流中的watermark表示中已经迟到,但会直接将迟到的消息根据
|
||||
相应的join类型或输出或丢弃。
|
||||
|
||||
|
||||
# 维表Join
|
||||
|
||||
维表(Dimension Table)是来自数仓建模的概念。在数仓模型中,事实表(Fact Table)是指存储有事实记录的表,如系统
|
||||
日志、销售记录等,而维表是与事实表相对应的一种表,它保存了事实表中指定属性的相关详细信息,可以跟事实表做关
|
||||
联;相当于将事实表上经常重复出现的属性抽取、规范出来用一张表进行管理。
|
||||
|
||||
在实际生产中,我们经常会有这样的需求,以原始数据流作为基础,关联大量的外部表来补充一些属性。这种查询操作就是
|
||||
典型的维表 JOIN。
|
||||
|
||||
## 使用维表的好处
|
||||
|
||||
- 缩小了事实表的大小。
|
||||
- 便于维度的管理和维护,增加、删除和修改维度的属性,不必对事实表的大量记录进行改动。
|
||||
- 维度表可以为多个事实表重用,以减少重复工作。
|
||||
|
||||
## 维表JOIN使用
|
||||
|
||||
由于维表是一张不断变化的表(静态表视为动态表的一种特例),因此在维表 JOIN 时,需指明这条记录关联维表快照的对
|
||||
应时刻。Flink SQL 的维表 JOIN 语法引入了 Temporal Table 的标准语法,用于声明流数据关联的是维表哪个时刻的快照。
|
||||
|
||||
需要注意是,目前原生 Flink SQL 的维表 JOIN 仅支持事实表对当前时刻维表快照的关联(处理时间语义),而不支持事实
|
||||
表 rowtime 所对应的维表快照的关联(事件时间语义)。
|
||||
|
||||
### 语法说明
|
||||
|
||||
Flink SQL 中使用语法`for SYSTEM_TIME as of PROC_TIME()`来标识维表JOIN。仅支持`INNER JOIN`和`LEFT JOIN`。
|
||||
|
||||
```sql
|
||||
SELECT
|
||||
column-namesFROM
|
||||
table1 [AS <alias1>][LEFT]
|
||||
JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
|
||||
ON table1.column-name1 = table2.key-name1
|
||||
```
|
||||
|
||||
***注意:***
|
||||
`table1.proctime`表示`table1`的`proctime`字段。
|
||||
|
||||
### 使用示例
|
||||
|
||||
下面用一个简单的示例来展示维表 JOIN 语法。假设我们有一个 Orders 订单数据流,希望根据用户 ID 补全订单中的用户
|
||||
信息,因此需要跟 Customer 维度表进行关联。
|
||||
|
||||
```sql
|
||||
CREATE TABLE Orders (
|
||||
id INT,
|
||||
price DOUBLE,
|
||||
quantity INT,
|
||||
proc_time AS PROCTIME(),
|
||||
PRIMARY KEY(id) NOT ENFORCED
|
||||
) WITH (
|
||||
'connector' = 'datagen',
|
||||
'fields.id.kind' = 'sequence',
|
||||
'rows-per-second' = '10'
|
||||
);
|
||||
CREATE TABLE Customers (
|
||||
id INT,
|
||||
name STRING,
|
||||
country STRING,
|
||||
zip STRING,
|
||||
PRIMARY KEY(id) NOT ENFORCED
|
||||
) WITH (
|
||||
'connector' = 'jdbc',
|
||||
'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
|
||||
'table-name' = 'customers'
|
||||
);
|
||||
CREATE TABLE OrderDetails (
|
||||
id INT,
|
||||
total_price DOUBLE,
|
||||
country STRING,
|
||||
zip STRING,
|
||||
PRIMARY KEY(id) NOT ENFORCED
|
||||
) WITH (
|
||||
'connector' = 'jdbc',
|
||||
'url' = 'jdbc:mysql://mysqlhost:3306/orderdb',
|
||||
'table-name' = 'orderdetails'
|
||||
);
|
||||
-- enrich each order with customer information
|
||||
INSERT INTO OrderDetails
|
||||
SELECT
|
||||
o.id,
|
||||
o.price,
|
||||
o.quantity,
|
||||
c.country,
|
||||
c.zipFROM
|
||||
Orders AS o
|
||||
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
|
||||
ON o.id = c.id;
|
||||
```
|
||||
|
||||
## Flink SQL 执行流程
|
||||
|
||||
Apache Calcite 是一款开源的 SQL 解析工具,被广泛使用于各个大数据项目中,主要用于解析 SQL 语句。SQL 的执行流程
|
||||
一般分为四个主要阶段:
|
||||
|
||||
- Parse:语法解析,把 SQL 语句转换成抽象语法树(AST),在 Calcite 中用 SqlNode 来表示;
|
||||
- Validate:语法校验,根据元数据信息进行验证,例如查询的表、使用的函数是否存在等,校验之后仍然是 SqlNode 构
|
||||
成的语法树;
|
||||
- Optimize:查询计划优化,包含两个阶段,1)将 SqlNode 语法树转换成关系表达式 RelNode 构成的逻辑树,2)使用优
|
||||
化器基于规则进行等价变换,例如谓词下推、列裁剪等,经过优化器优化后得到最优的查询计划;
|
||||
- Execute:将逻辑查询计划翻译成物理执行计划,生成对应的可执行代码,提交运行。
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user