flink lookup join mysql demo


Flink 1.12 的時候嘗試使用 JDBC SQL Connector kafka 流關聯 mysql 表,使用 lookup cache 緩存 mysql 數據,測試在關聯性能和更新時效的平衡。不過遭遇了失敗,嘗試各種 join 也無法實現,mysql source 使用 InputFormatSource 一次性把 mysql 的數據讀完,mysql source 就退出了。

在 Flink 1.13 的 SQL 文檔看到這個: https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#lookup-join

-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers'
);

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

jdbc connector 關於 Lookup Cache 的描述還是和上一版本一樣的

JDBC 連接器可以用在時態表關聯中作為一個可 lookup 的 source (又稱為維表),當前只支持同步的查找模式。

默認情況下,lookup cache 是未啟用的,你可以設置 lookup.cache.max-rows and lookup.cache.ttl 參數來啟用。

lookup cache 的主要目的是用於提高時態表關聯 JDBC 連接器的性能。默認情況下,lookup cache 不開啟,所以所有請求都會發送到外部數據庫。 
當 lookup cache 被啟用時,每個進程(即 TaskManager)將維護一個緩存。
Flink 將優先查找緩存,只有當緩存未查找到時才向外部數據庫發送請求,並使用返回的數據更新緩存。
當緩存命中最大緩存行 lookup.cache.max
-rows 或當行超過最大存活時間 lookup.cache.ttl 時,緩存中最老的行將被設置為已過期。
緩存中的記錄可能不是最新的,用戶可以將 lookup.cache.ttl 設置為一個更小的值以獲得更好的刷新數據,但這可能會增加發送到數據庫的請求數。
所以要做好吞吐量和正確性之間的平衡。

測試SQL 如下

CREATE TABLE user_log (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts TIMESTAMP(3)
  ,process_time as proctime()
  , WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka'
  ,'topic' = 'user_behavior'
  ,'properties.bootstrap.servers' = 'localhost:9092'
  ,'properties.group.id' = 'user_log'
  ,'scan.startup.mode' = 'group-offsets'
  ,'format' = 'json'
);

CREATE TEMPORARY TABLE mysql_behavior_conf (
   id int
  ,code STRING
  ,map_val STRING
  ,update_time TIMESTAMP(3)
--   ,primary key (id) not enforced
--   ,WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
) WITH (
   'connector' = 'jdbc'
   ,'url' = 'jdbc:mysql://localhost:3306/venn'
   ,'table-name' = 'lookup_join_config'
   ,'username' = 'root'
   ,'password' = '123456'
   ,'lookup.cache.max-rows' = '1000'
   ,'lookup.cache.ttl' = '1 minute' -- 緩存時間,即使一直在訪問也會刪除
);

SELECT a.user_id, a.item_id, a.category_id, a.behavior, c.map_val, a.ts
FROM user_log a
  left join mysql_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c
  ON a.behavior = c.code
where a.behavior is not null;

Lookup Cache 嘗試成功

查看 mysql Lookup Join 的源碼: LookupJoinRunner

@Override
public void processElement(RowData in, Context ctx, Collector<RowData> out) throws Exception {
    collector.setCollector(out);
    collector.setInput(in);
    collector.reset();

    // fetcher has copied the input field when object reuse is enabled
    fetcher.flatMap(in, getFetcherCollector());

    if (isLeftOuterJoin && !collector.isCollected()) {
        outRow.replace(in, nullRow);
        outRow.setRowKind(in.getRowKind());
        out.collect(outRow);
    }
}

getFetcherCollector 方法調用 JdbcRowDataLookupFunction.java,查詢和返回緩存的地方

/**
 * This is a lookup method which is called by Flink framework in runtime.
 *
 * @param keys lookup keys
 */
public void eval(Object... keys) {
    // 獲取 key
    RowData keyRow = GenericRowData.of(keys);
    // 如果沒有緩存
    if (cache != null) {
        // 獲取緩存
        List<RowData> cachedRows = cache.getIfPresent(keyRow);
        if (cachedRows != null) {
            for (RowData cachedRow : cachedRows) {
                collect(cachedRow);
            }
            return;
        }
    }
    // 從源端重新獲取數據
    for (int retry = 0; retry <= maxRetryTimes; retry++) {
        try {
            statement.clearParameters();
            statement = lookupKeyRowConverter.toExternal(keyRow, statement);
            // 查詢 數據庫
            try (ResultSet resultSet = statement.executeQuery()) {
                if (cache == null) {
                    while (resultSet.next()) {
                        collect(jdbcRowConverter.toInternal(resultSet));
                    }
                } else {
                    ArrayList<RowData> rows = new ArrayList<>();
                    while (resultSet.next()) {
                        RowData row = jdbcRowConverter.toInternal(resultSet);
                        rows.add(row);
                        // 返回數據
                        collect(row);
                    }
                    rows.trimToSize();
                    // 放入緩存
                    cache.put(keyRow, rows);
                }
            }
            break;
        } catch (SQLException e) {
            ...略...
        }
    }
}

緩存使用 guava 的 LocalCache

注: ttl 時間是 expireAfterWrite,寫入后固定時間時效

this.cache =
        cacheMaxSize == -1 || cacheExpireMs == -1
                ? null
                : CacheBuilder.newBuilder()
                        .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
                        .maximumSize(cacheMaxSize)
                        .build();

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM