Flink 1.12 的時候嘗試使用 JDBC SQL Connector kafka 流關聯 mysql 表,使用 lookup cache 緩存 mysql 數據,測試在關聯性能和更新時效的平衡。不過遭遇了失敗,嘗試各種 join 也無法實現,mysql source 使用 InputFormatSource 一次性把 mysql 的數據讀完,mysql source 就退出了。
在 Flink 1.13 的 SQL 文檔看到這個: https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#lookup-join
-- Customers is backed by the JDBC connector and can be used for lookup joins CREATE TEMPORARY TABLE Customers ( id INT, name STRING, country STRING, zip STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', 'table-name' = 'customers' ); -- enrich each order with customer information SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
jdbc connector 關於 Lookup Cache 的描述還是和上一版本一樣的
JDBC 連接器可以用在時態表關聯中作為一個可 lookup 的 source (又稱為維表),當前只支持同步的查找模式。 默認情況下,lookup cache 是未啟用的,你可以設置 lookup.cache.max-rows and lookup.cache.ttl 參數來啟用。 lookup cache 的主要目的是用於提高時態表關聯 JDBC 連接器的性能。默認情況下,lookup cache 不開啟,所以所有請求都會發送到外部數據庫。
當 lookup cache 被啟用時,每個進程(即 TaskManager)將維護一個緩存。
Flink 將優先查找緩存,只有當緩存未查找到時才向外部數據庫發送請求,並使用返回的數據更新緩存。
當緩存命中最大緩存行 lookup.cache.max-rows 或當行超過最大存活時間 lookup.cache.ttl 時,緩存中最老的行將被設置為已過期。
緩存中的記錄可能不是最新的,用戶可以將 lookup.cache.ttl 設置為一個更小的值以獲得更好的刷新數據,但這可能會增加發送到數據庫的請求數。
所以要做好吞吐量和正確性之間的平衡。
測試SQL 如下
CREATE TABLE user_log ( user_id STRING ,item_id STRING ,category_id STRING ,behavior STRING ,ts TIMESTAMP(3) ,process_time as proctime() , WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka' ,'topic' = 'user_behavior' ,'properties.bootstrap.servers' = 'localhost:9092' ,'properties.group.id' = 'user_log' ,'scan.startup.mode' = 'group-offsets' ,'format' = 'json' ); CREATE TEMPORARY TABLE mysql_behavior_conf ( id int ,code STRING ,map_val STRING ,update_time TIMESTAMP(3) -- ,primary key (id) not enforced -- ,WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'jdbc' ,'url' = 'jdbc:mysql://localhost:3306/venn' ,'table-name' = 'lookup_join_config' ,'username' = 'root' ,'password' = '123456' ,'lookup.cache.max-rows' = '1000' ,'lookup.cache.ttl' = '1 minute' -- 緩存時間,即使一直在訪問也會刪除 ); SELECT a.user_id, a.item_id, a.category_id, a.behavior, c.map_val, a.ts FROM user_log a left join mysql_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c ON a.behavior = c.code where a.behavior is not null;
Lookup Cache 嘗試成功
查看 mysql Lookup Join 的源碼: LookupJoinRunner
@Override public void processElement(RowData in, Context ctx, Collector<RowData> out) throws Exception { collector.setCollector(out); collector.setInput(in); collector.reset(); // fetcher has copied the input field when object reuse is enabled fetcher.flatMap(in, getFetcherCollector()); if (isLeftOuterJoin && !collector.isCollected()) { outRow.replace(in, nullRow); outRow.setRowKind(in.getRowKind()); out.collect(outRow); } }
getFetcherCollector 方法調用 JdbcRowDataLookupFunction.java,查詢和返回緩存的地方
/** * This is a lookup method which is called by Flink framework in runtime. * * @param keys lookup keys */ public void eval(Object... keys) { // 獲取 key RowData keyRow = GenericRowData.of(keys); // 如果沒有緩存 if (cache != null) { // 獲取緩存 List<RowData> cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (RowData cachedRow : cachedRows) { collect(cachedRow); } return; } } // 從源端重新獲取數據 for (int retry = 0; retry <= maxRetryTimes; retry++) { try { statement.clearParameters(); statement = lookupKeyRowConverter.toExternal(keyRow, statement); // 查詢 數據庫 try (ResultSet resultSet = statement.executeQuery()) { if (cache == null) { while (resultSet.next()) { collect(jdbcRowConverter.toInternal(resultSet)); } } else { ArrayList<RowData> rows = new ArrayList<>(); while (resultSet.next()) { RowData row = jdbcRowConverter.toInternal(resultSet); rows.add(row); // 返回數據 collect(row); } rows.trimToSize(); // 放入緩存 cache.put(keyRow, rows); } } break; } catch (SQLException e) { ...略... } } }
緩存使用 guava 的 LocalCache
注: ttl 時間是 expireAfterWrite,寫入后固定時間時效
this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder() .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS) .maximumSize(cacheMaxSize) .build();
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文