lookup join mysql demo: flink lookup join mysql demo
## join rowkey
-- Lookup Source -- kafka source CREATE TABLE user_log ( user_id STRING ,item_id STRING ,category_id STRING ,behavior STRING ,ts TIMESTAMP(3) ,process_time as proctime() , WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka' ,'topic' = 'user_behavior' ,'properties.bootstrap.servers' = 'localhost:9092' ,'properties.group.id' = 'user_log' ,'scan.startup.mode' = 'group-offsets' ,'format' = 'json' ); drop table if exists hbase_behavior_conf ; CREATE TEMPORARY TABLE hbase_behavior_conf ( rowkey STRING ,cf ROW(item_id STRING ,category_id STRING ,behavior STRING ,ts TIMESTAMP(3)) ) WITH ( 'connector' = 'hbase-2.2' ,'zookeeper.quorum' = 'thinkpad:12181' ,'table-name' = 'user_log' ,'lookup.cache.max-rows' = '10000' ,'lookup.cache.ttl' = '10 second' ,'lookup.async' = 'true' ); ---sinkTable CREATE TABLE kakfa_join_mysql_demo ( user_id STRING ,item_id STRING ,category_id STRING ,behavior STRING ,behavior_map STRING ,ts TIMESTAMP(3) -- ,primary key (user_id) not enforced ) WITH ( 'connector' = 'kafka' ,'topic' = 'user_behavior_1' ,'properties.bootstrap.servers' = 'localhost:9092' ,'properties.group.id' = 'user_log' ,'scan.startup.mode' = 'group-offsets' ,'format' = 'json' ); INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts) SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts FROM user_log a left join hbase_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c ON a.user_id = rowkey where a.behavior is not null;
測試 hbase 維表Lookup 功能正常,可以正常緩存數據,緩存也會定時失效,透查Hbase
* 注: 隨便測試了一下性能,Hbase 維表有2 萬多條數據,輸入數據的關聯字段都是Hbase 表主鍵,lookup.cache.ttl 為 1分鍾,關聯的 TPS 輕松到達: 2W
## join 非 rowkey
join 非主鍵時,hbase 維表啟動時一次性讀取 hbase 表全部數據,緩存到內存中,hbase source 狀態 finish
INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts) SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts FROM user_log a left join hbase_behavior_conf c ON a.item_id = cf.item_id where a.behavior is not null;
* 注: 這種情況在 flink 1.13 版本,不能完成 checkpoint
2021-08-20 09:12:39,313 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog, default_database, hbase_behavior_conf, project=[cf]]], fields=[cf]) -> Calc(select=[cf, cf.item_id AS $f2]) (1/1) (55be6048b81e2eef95f25d78b3705a34) switched from RUNNING to FINISHED. 2021-08-20 09:13:30,356 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed to trigger checkpoint for job 971bbb75c7334b0c2b9d218005efbf00 since some tasks of job 971bbb75c7334b0c2b9d218005efbf00 has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.
Hbase 關聯只有在關聯鍵是Hbase 表的主鍵的時候,才能應用 Lookup 功能,非主鍵一次性加載,維表數據沒辦法更新,而且不能做 Checkpoint 影響 Flink job 的一致性。
在之前版本,SQL 功能不完善的時候,我們使用 UDF 的方式關聯 Hbase,可以在 UDF 里面自己關聯緩存、透查Hbase,也比較靈活。(沒找到之前的代碼,空了自己寫一個,可以再水一篇博客)
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文