FLINK SQL JOIN類型


一、REGULAR JOIN

  • INNER JOIN:在流處理任務中只用兩條流JOIN到才輸出,+[L, R]
  • LEFT JOIN:在流處理任務中,左流數據到達之后,不管有沒有JOIN到右流數據都會輸出(JOIN到:+[L, R],沒有JOIN:+[L, NULL]),如果右流數據到達之后,發現左流有輸出+[L, NULL],則會發起回撤,先輸出-[L, NULL],再輸出+[L, R]
  • RIGHT JOIN:左右流的執行邏輯和LEFT JOIN一樣
  • FULL JOIN:在流處理任務中,左流或者右流的數據到達之后,無論有沒有 Join 到另外一條流的數據,都會輸出(對右流來說:Join 到輸出 +[L, R],沒 Join 到輸出 +[NULL, R];對左流來說:JOIN到輸出 ,沒JOIN到輸出+[L, NULL])。如果一條流的數據到達之后,發現之前另一條流之前輸出過沒有 Join 到的數據,則會發起回撤流(左流數據到達為例:回撤-[NULL, R],輸出 +[L, R],右流數據到達為例:回撤-[L, NULL] ,輸出+[L, R])  
  • 測試用例
    CREATE TABLE show_log_table(
        log_id      BIGINT,
        show_params STRING
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '5',
      'fields.show_params.length' = '1',
      'fields.log_id.min' = '1',
      'fields.log_id.max' = '100'
    );
    
    CREATE TABLE click_log_table(
        log_id       BIGINT,
        click_params STRING
    )
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '5',
        'fields.click_params.length' = '1',
        'fields.log_id.min' = '1',
        'fields.log_id.max' = '10'
    );
    
    SELECT *
    FROM click_log_table
    LEFT JOIN show_log_table
    ON click_log_table.log_id = show_log_table.log_id
    ;
    
    SELECT *
    FROM click_log_table
    INNER JOIN show_log_table
    ON click_log_table.log_id = show_log_table.log_id
    ;
    
    SELECT *
    FROM click_log_table
    FULL JOIN show_log_table
    ON click_log_table.log_id = show_log_table.log_id
    ;
  • 注意:

    1)實時REGULAR JOIN支持等值JOIN和不等值JOIN,等值JOIN SHUFFLE策略是HASH,非等值JOIN策略是GLOBAL,所有數據發往一個並發,按照非等值條件進行關聯

    2)REGULAR JOIN會將兩條流的所有數據都存儲在 State 中,所以 Flink 任務的 State 會無限增大,因此需要為 State 配置合適的 TTL,以防止 State 過大

二、INTERVAL JOIN

  • 時間區間JOIN:讓一條流去JOIN另一條流的前后一段時間內的數據,INTERVAL JOIN可以避免回撤流的產生,在某些場景下,下游輸出系統不具備處理回撤流的能力,此時可以借助INTERVAL JOIN
  • INNER INTERVAL JOIN:只有兩條流 JOIN 到(滿足ON中的條件:兩條流的數據在時間區間 + 滿足其他等值條件)才輸出,輸出 +[L, R]
  • LEFT INTERVAL JOIN:流任務中,左流數據到達之后,如果沒有JOIN到右流的數據,就會等待(放在 State 中等),如果之后右流之后數據到達之后,發現能和剛剛那條左流數據 JOIN 到,則會輸出+[L, R]。事件時間中隨着 Watermark 的推進, 如果發現發現左流 State 中的數據過期了,就把左流中過期的數據從 State 中刪除,然后輸出+[L, R],如果右流 State 中的數據過期了,就直接從 State 中刪除
  • RIGHT INTERVAL JOIN:處理邏輯和LEFT INTERVAL JOIN類似
  • FULL INTERVAL JOIN:流任務中,左流或者右流的數據到達之后,如果沒有 Join 到另外一條流的數據,就會等待(左流放在左流對應的 State 中等,右流放在右流對應的 State 中等),如果之后另一條流數據到達之后,發現能和剛剛那條數據 Join 到,則會輸出+[L, R]。事件時間中隨着 Watermark 的推進(也支持處理時間),發現 State 中的數據能夠過期了,就將這些數據從 State 中刪除並且輸出(左流過期輸出 +[L, NULL],右流過期輸出 -[NULL, R])  
  • 案例:
    CREATE TABLE show_log_table(
        log_id      BIGINT,
        show_params STRING,
        row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
        WATERMARK FOR row_time AS row_time
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '5',
      'fields.show_params.length' = '1',
      'fields.log_id.min' = '1',
      'fields.log_id.max' = '100'
    );
    
    CREATE TABLE click_log_table(
        log_id       BIGINT,
        click_params STRING,
        row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
        WATERMARK FOR row_time AS row_time
    )
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '5',
        'fields.click_params.length' = '1',
        'fields.log_id.min' = '1',
        'fields.log_id.max' = '10'
    );
    
    SELECT
        show_log_table.log_id as s_id,
        show_log_table.show_params as s_params,
        click_log_table.log_id as c_id,
        click_log_table.click_params as c_param
    FROM show_log_table
    INNER JOIN click_log_table
    ON show_log_table.log_id = click_log_table.log_id
    AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '5' MINUTE
    AND click_log_table.row_time + INTERVAL '5' MINUTE
    ;

     

三、TEMPORARY JOIN

 

四、LOOKUP JOIN

 

五、列轉行


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM