flink sql join hbase demo


lookup join mysql demo:  flink lookup join mysql demo

## join rowkey

-- Lookup Source
-- kafka source
CREATE TABLE user_log (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
,ts TIMESTAMP(3)
,process_time as proctime()
, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka'
,'topic' = 'user_behavior'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'group-offsets'
,'format' = 'json'
);

drop table if exists hbase_behavior_conf ;
CREATE TEMPORARY TABLE hbase_behavior_conf (
rowkey STRING
,cf ROW(item_id STRING
,category_id STRING
,behavior STRING
,ts TIMESTAMP(3))
) WITH (
'connector' = 'hbase-2.2'
,'zookeeper.quorum' = 'thinkpad:12181'
,'table-name' = 'user_log'
,'lookup.cache.max-rows' = '10000'
,'lookup.cache.ttl' = '10 second'
,'lookup.async' = 'true'
);

---sinkTable
CREATE TABLE kakfa_join_mysql_demo (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
,behavior_map STRING
,ts TIMESTAMP(3)
-- ,primary key (user_id) not enforced
) WITH (
'connector' = 'kafka'
,'topic' = 'user_behavior_1'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'group-offsets'
,'format' = 'json'
);

INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts
FROM user_log a
left join hbase_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.user_id = rowkey
where a.behavior is not null;

 

測試 hbase 維表Lookup 功能正常,可以正常緩存數據,緩存也會定時失效,透查Hbase

 


* 注: 隨便測試了一下性能,Hbase 維表有2 萬多條數據,輸入數據的關聯字段都是Hbase 表主鍵,lookup.cache.ttl 為 1分鍾,關聯的 TPS 輕松到達: 2W

## join 非 rowkey

 

join 非主鍵時,hbase 維表啟動時一次性讀取 hbase 表全部數據,緩存到內存中,hbase source 狀態 finish

INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts
FROM user_log a
left join hbase_behavior_conf c ON a.item_id = cf.item_id
where a.behavior is not null;

 

 


* 注: 這種情況在 flink 1.13 版本,不能完成 checkpoint

2021-08-20 09:12:39,313 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog, default_database, hbase_behavior_conf, project=[cf]]], fields=[cf]) -> Calc(select=[cf, cf.item_id AS $f2]) (1/1) (55be6048b81e2eef95f25d78b3705a34) switched from RUNNING to FINISHED.
2021-08-20 09:13:30,356 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed to trigger checkpoint for job 971bbb75c7334b0c2b9d218005efbf00 since some tasks of job 971bbb75c7334b0c2b9d218005efbf00 has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.

 

Hbase 關聯只有在關聯鍵是Hbase 表的主鍵的時候,才能應用 Lookup 功能,非主鍵一次性加載,維表數據沒辦法更新,而且不能做 Checkpoint 影響 Flink job 的一致性。

在之前版本,SQL 功能不完善的時候,我們使用 UDF 的方式關聯 Hbase,可以在 UDF 里面自己關聯緩存、透查Hbase,也比較靈活。(沒找到之前的代碼,空了自己寫一個,可以再水一篇博客)

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM