本文參考官網 Table & SQL Connectors JDBC SQL Connector https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#lookup-cache
jdbc 依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.12.0</version>
</dependency>
```
這里使用 mysql 所以還需要 mysql 的依賴
```xml
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector.version}</version>
</dependency>
scan source : bounded
一次執行,讀完就任務結束
drop table if exists mysql_user_log ; CREATE TABLE mysql_user_log ( id int ,user_id VARCHAR ,item_id VARCHAR ,category_id VARCHAR ,behavior VARCHAR ,ts TIMESTAMP(3) ,create_time TIMESTAMP(3) ,insert_time TIMESTAMP(3) ,primary key (id) not enforced ) WITH ( 'connector' = 'jdbc' ,'url' = 'jdbc:mysql://venn:3306/venn' ,'table-name' = 'user_log' ,'username' = 'root' ,'password' = '123456' );
注: 由於 flink 本身並不存儲數據,所以主鍵是 'not enforced' 未執行的
時態表 join
-- Lookup Source: Sync Mode -- kafka source CREATE TABLE user_log ( user_id VARCHAR ,item_id VARCHAR ,category_id VARCHAR ,behavior INT ,ts TIMESTAMP(3) ,process_time as proctime() , WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'kafka' ,'topic' = 'user_behavior' ,'properties.bootstrap.servers' = 'localhost:9092' ,'properties.group.id' = 'user_log' ,'scan.startup.mode' = 'group-offsets' ,'format' = 'json' ); -- mysql source drop table if exists mysql_behavior_conf ; CREATE TABLE mysql_behavior_conf ( id int ,behavior VARCHAR ,behavior_map VARCHAR ,update_time TIMESTAMP(3) -- ,process_time as proctime() ,primary key (id) not enforced , WATERMARK FOR update_time AS update_time ) WITH ( 'connector' = 'jdbc' ,'url' = 'jdbc:mysql://venn:3306/venn' ,'table-name' = 'behavior_conf' ,'username' = 'root' ,'password' = '123456' ,'scan.partition.column' = 'id' ,'scan.partition.num' = '1' ,'scan.partition.lower-bound' = '0' ,'scan.partition.upper-bound' = '9999' ,'lookup.cache.max-rows' = '1000' ,'lookup.cache.ttl' = '2 minute' ); ---sinkTable CREATE TABLE kakfa_join_mysql_demo ( user_id VARCHAR ,item_id VARCHAR ,category_id VARCHAR ,behavior INT ,behavior_map VARCHAR ,ts TIMESTAMP(3) ,primary key (user_id) not enforced ) WITH ( 'connector' = 'upsert-kafka' ,'topic' = 'user_behavior_sink' ,'properties.bootstrap.servers' = 'localhost:9092' ,'properties.group.id' = 'user_log' ,'key.format' = 'json' ,'key.json.ignore-parse-errors' = 'true' ,'value.format' = 'json' ,'value.json.fail-on-missing-field' = 'false' ,'value.fields-include' = 'ALL' ); ---sink 左表的事件時間字段 INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts) SELECT a.user_id, a.item_id, a.category_id, a.behavior, b.behavior_map, a.ts FROM user_log a left join mysql_behavior_conf for system_time as of a.ts as b on a.behavior = b.id where a.behavior is not null;
mysql_behavior_conf 表數據:
id,behavior,behavior_map,update_time 1,pv,pv-map,2021-02-01 06:10:54 2,buy,buy-map,2021-02-01 06:10:56 3,cart,cart-map,2021-02-01 06:10:58 4,fav,fav-map,2021-02-01 06:11:00 5,pv_0,map_0,2021-02-02 07:41:24 6,pv_1,map_1,2021-02-02 07:41:25 7,pv_2,map_2,2021-02-02 07:41:25 8,pv_3,map_3,2021-02-02 07:41:26 9,pv_4,map_4,2021-02-02 07:41:26 10,pv_5,map_5,2021-02-02 07:41:26
... 到 10000
user_log 數據:
{"user_id": "652863", "item_id":"4967749", "category_id": "1320293", "behavior": "1", "ts": "2021-02-02 14:50:00"} {"user_id": "801610", "item_id":"900305", "category_id": "634390", "behavior": "2", "ts": "2021-02-02 14:50:11"} {"user_id": "411478", "item_id":"3259235", "category_id": "2667323", "behavior": "3", "ts": "2021-02-02 14:50:22"} {"user_id": "431664", "item_id":"764155", "category_id": "2520377", "behavior": "4", "ts": "2021-02-02 14:50:33"} {"user_id": "431664", "item_id":"764155", "category_id": "2520377", "behavior": "1001", "ts": "2021-02-02 16:51:58"}
輸出結果如下
{"user_id":"user_id_813","item_id":"item_id_813","category_id":"category_id_813","behavior":813,"behavior_map":"map_808","ts":"2021-02-02 16:50:40"} {"user_id":"user_id_633","item_id":"item_id_633","category_id":"category_id_633","behavior":633,"behavior_map":"map_628","ts":"2021-02-02 16:50:44"} {"user_id":"user_id_8425","item_id":"item_id_8425","category_id":"category_id_8425","behavior":8425,"behavior_map":null,"ts":"2021-02-02 16:50:48"} {"user_id":"user_id_8701","item_id":"item_id_8701","category_id":"category_id_8701","behavior":8701,"behavior_map":null,"ts":"2021-02-02 16:50:52"} {"user_id":"user_id_9983","item_id":"item_id_9983","category_id":"category_id_9983","behavior":9983,"behavior_map":null,"ts":"2021-02-02 16:50:56"} {"user_id":"431664","item_id":"764155","category_id":"2520377","behavior":7000,"behavior_map":null,"ts":"2021-02-02 16:51:56"}
參數 "scan.partition.lower-bound" 和 “scan.partition.upper-bound” 是生效的, 'lookup.cache.max-rows' 和 'lookup.cache.ttl' 沒有生效
behavior_map 為 null 的是,沒有關聯到數據的
官網介紹,如果不在緩存中會去數據庫查詢,實際上並沒有做為 Lookup Source 在處理,就是個 InputFormatSource 一次性把 mysql 的數據讀完,mysql source 就退出了(難道是姿勢不對,沒有走到 Lookup Source )
看起來和一般的join 和 一般的 join 的效果看起來並沒有什么不一樣的(user_log 表需要去掉 事件事件屬性),維表都是一次性讀取,然后 finish
INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts) SELECT a.user_id, a.item_id, a.category_id, a.behavior, b.behavior_map, a.ts FROM user_log a left join mysql_behavior_conf b on a.behavior = b.id where a.behavior is not null;
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文