一、REGULAR JOIN
- INNER JOIN:在流處理任務中只用兩條流JOIN到才輸出,+[L, R]
- LEFT JOIN:在流處理任務中,左流數據到達之后,不管有沒有JOIN到右流數據都會輸出(JOIN到:+[L, R],沒有JOIN:+[L, NULL]),如果右流數據到達之后,發現左流有輸出+[L, NULL],則會發起回撤,先輸出-[L, NULL],再輸出+[L, R]
- RIGHT JOIN:左右流的執行邏輯和LEFT JOIN一樣
- FULL JOIN:在流處理任務中,左流或者右流的數據到達之后,無論有沒有 Join 到另外一條流的數據,都會輸出(對右流來說:Join 到輸出 +[L, R],沒 Join 到輸出 +[NULL, R];對左流來說:JOIN到輸出 ,沒JOIN到輸出+[L, NULL])。如果一條流的數據到達之后,發現之前另一條流之前輸出過沒有 Join 到的數據,則會發起回撤流(左流數據到達為例:回撤-[NULL, R],輸出 +[L, R],右流數據到達為例:回撤-[L, NULL] ,輸出+[L, R])
- 測試用例
CREATE TABLE show_log_table( log_id BIGINT, show_params STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.show_params.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '100' ); CREATE TABLE click_log_table( log_id BIGINT, click_params STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.click_params.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '10' ); SELECT * FROM click_log_table LEFT JOIN show_log_table ON click_log_table.log_id = show_log_table.log_id ; SELECT * FROM click_log_table INNER JOIN show_log_table ON click_log_table.log_id = show_log_table.log_id ; SELECT * FROM click_log_table FULL JOIN show_log_table ON click_log_table.log_id = show_log_table.log_id ;
- 注意:
1)實時REGULAR JOIN支持等值JOIN和不等值JOIN,等值JOIN SHUFFLE策略是HASH,非等值JOIN策略是GLOBAL,所有數據發往一個並發,按照非等值條件進行關聯
2)REGULAR JOIN會將兩條流的所有數據都存儲在 State 中,所以 Flink 任務的 State 會無限增大,因此需要為 State 配置合適的 TTL,以防止 State 過大
二、INTERVAL JOIN
- 時間區間JOIN:讓一條流去JOIN另一條流的前后一段時間內的數據,INTERVAL JOIN可以避免回撤流的產生,在某些場景下,下游輸出系統不具備處理回撤流的能力,此時可以借助INTERVAL JOIN
- INNER INTERVAL JOIN:只有兩條流 JOIN 到(滿足ON中的條件:兩條流的數據在時間區間 + 滿足其他等值條件)才輸出,輸出 +[L, R]
- LEFT INTERVAL JOIN:流任務中,左流數據到達之后,如果沒有JOIN到右流的數據,就會等待(放在 State 中等),如果之后右流之后數據到達之后,發現能和剛剛那條左流數據 JOIN 到,則會輸出+[L, R]。事件時間中隨着 Watermark 的推進, 如果發現發現左流 State 中的數據過期了,就把左流中過期的數據從 State 中刪除,然后輸出+[L, R],如果右流 State 中的數據過期了,就直接從 State 中刪除
- RIGHT INTERVAL JOIN:處理邏輯和LEFT INTERVAL JOIN類似
- FULL INTERVAL JOIN:流任務中,左流或者右流的數據到達之后,如果沒有 Join 到另外一條流的數據,就會等待(左流放在左流對應的 State 中等,右流放在右流對應的 State 中等),如果之后另一條流數據到達之后,發現能和剛剛那條數據 Join 到,則會輸出+[L, R]。事件時間中隨着 Watermark 的推進(也支持處理時間),發現 State 中的數據能夠過期了,就將這些數據從 State 中刪除並且輸出(左流過期輸出 +[L, NULL],右流過期輸出 -[NULL, R])
- 案例:
CREATE TABLE show_log_table( log_id BIGINT, show_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.show_params.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '100' ); CREATE TABLE click_log_table( log_id BIGINT, click_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.click_params.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '10' ); SELECT show_log_table.log_id as s_id, show_log_table.show_params as s_params, click_log_table.log_id as c_id, click_log_table.click_params as c_param FROM show_log_table INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '5' MINUTE AND click_log_table.row_time + INTERVAL '5' MINUTE ;
三、TEMPORARY JOIN
四、LOOKUP JOIN
五、列轉行