Flink Sql jdbc connector


本文參考官網 Table & SQL Connectors JDBC SQL Connector https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#lookup-cache

jdbc 依賴

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc_2.11</artifactId>
  <version>1.12.0</version>
</dependency>
```
這里使用 mysql 所以還需要 mysql 的依賴
```xml
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql-connector.version}</version>
</dependency>

scan source : bounded

一次執行,讀完就任務結束

drop table if exists mysql_user_log ;
CREATE TABLE mysql_user_log (
    id int
  ,user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior VARCHAR
  ,ts TIMESTAMP(3)
  ,create_time TIMESTAMP(3)
  ,insert_time TIMESTAMP(3)
  ,primary key (id) not enforced
) WITH (
   'connector' = 'jdbc'
   ,'url' = 'jdbc:mysql://venn:3306/venn'
   ,'table-name' = 'user_log'
   ,'username' = 'root'
   ,'password' = '123456'
);

注: 由於 flink 本身並不存儲數據,所以主鍵是 'not enforced' 未執行的

時態表 join 

-- Lookup Source: Sync Mode
-- kafka source
CREATE TABLE user_log (
  user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior INT
  ,ts TIMESTAMP(3)
  ,process_time as proctime()
  , WATERMARK FOR ts AS ts
) WITH (
  'connector' = 'kafka'
  ,'topic' = 'user_behavior'
  ,'properties.bootstrap.servers' = 'localhost:9092'
  ,'properties.group.id' = 'user_log'
  ,'scan.startup.mode' = 'group-offsets'
  ,'format' = 'json'
);

-- mysql source
drop table if exists mysql_behavior_conf ;
CREATE TABLE mysql_behavior_conf (
    id int
  ,behavior VARCHAR
  ,behavior_map VARCHAR
  ,update_time TIMESTAMP(3)
--   ,process_time as proctime()
  ,primary key (id) not enforced
  , WATERMARK FOR update_time AS update_time
) WITH (
   'connector' = 'jdbc'
   ,'url' = 'jdbc:mysql://venn:3306/venn'
   ,'table-name' = 'behavior_conf'
   ,'username' = 'root'
   ,'password' = '123456'
   ,'scan.partition.column' = 'id'
   ,'scan.partition.num' = '1'
   ,'scan.partition.lower-bound' = '0'
   ,'scan.partition.upper-bound' = '9999'
   ,'lookup.cache.max-rows' = '1000'
   ,'lookup.cache.ttl' = '2 minute'
);

---sinkTable
CREATE TABLE kakfa_join_mysql_demo (
  user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior INT
  ,behavior_map VARCHAR
  ,ts TIMESTAMP(3)
  ,primary key (user_id) not enforced
) WITH (
'connector' = 'upsert-kafka'
  ,'topic' = 'user_behavior_sink'
  ,'properties.bootstrap.servers' = 'localhost:9092'
  ,'properties.group.id' = 'user_log'
  ,'key.format' = 'json'
  ,'key.json.ignore-parse-errors' = 'true'
  ,'value.format' = 'json'
  ,'value.json.fail-on-missing-field' = 'false'
  ,'value.fields-include' = 'ALL'
);

---sink 左表的事件時間字段
INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
SELECT a.user_id, a.item_id, a.category_id, a.behavior, b.behavior_map, a.ts
FROM user_log a
  left join mysql_behavior_conf for system_time as of a.ts as b on a.behavior = b.id
where a.behavior is not null;

mysql_behavior_conf 表數據:

id,behavior,behavior_map,update_time
1,pv,pv-map,2021-02-01 06:10:54
2,buy,buy-map,2021-02-01 06:10:56
3,cart,cart-map,2021-02-01 06:10:58
4,fav,fav-map,2021-02-01 06:11:00
5,pv_0,map_0,2021-02-02 07:41:24
6,pv_1,map_1,2021-02-02 07:41:25
7,pv_2,map_2,2021-02-02 07:41:25
8,pv_3,map_3,2021-02-02 07:41:26
9,pv_4,map_4,2021-02-02 07:41:26
10,pv_5,map_5,2021-02-02 07:41:26
... 到 10000

user_log 數據:

{"user_id": "652863", "item_id":"4967749", "category_id": "1320293", "behavior": "1", "ts": "2021-02-02 14:50:00"}
{"user_id": "801610", "item_id":"900305", "category_id": "634390", "behavior": "2", "ts": "2021-02-02 14:50:11"}
{"user_id": "411478", "item_id":"3259235", "category_id": "2667323", "behavior": "3", "ts": "2021-02-02 14:50:22"}
{"user_id": "431664", "item_id":"764155", "category_id": "2520377", "behavior": "4", "ts": "2021-02-02 14:50:33"}
{"user_id": "431664", "item_id":"764155", "category_id": "2520377", "behavior": "1001", "ts": "2021-02-02 16:51:58"}

輸出結果如下

{"user_id":"user_id_813","item_id":"item_id_813","category_id":"category_id_813","behavior":813,"behavior_map":"map_808","ts":"2021-02-02 16:50:40"}
{"user_id":"user_id_633","item_id":"item_id_633","category_id":"category_id_633","behavior":633,"behavior_map":"map_628","ts":"2021-02-02 16:50:44"}
{"user_id":"user_id_8425","item_id":"item_id_8425","category_id":"category_id_8425","behavior":8425,"behavior_map":null,"ts":"2021-02-02 16:50:48"}
{"user_id":"user_id_8701","item_id":"item_id_8701","category_id":"category_id_8701","behavior":8701,"behavior_map":null,"ts":"2021-02-02 16:50:52"}
{"user_id":"user_id_9983","item_id":"item_id_9983","category_id":"category_id_9983","behavior":9983,"behavior_map":null,"ts":"2021-02-02 16:50:56"}
{"user_id":"431664","item_id":"764155","category_id":"2520377","behavior":7000,"behavior_map":null,"ts":"2021-02-02 16:51:56"}

參數 "scan.partition.lower-bound" 和 “scan.partition.upper-bound” 是生效的, 'lookup.cache.max-rows' 和 'lookup.cache.ttl' 沒有生效

behavior_map 為 null 的是,沒有關聯到數據的

官網介紹,如果不在緩存中會去數據庫查詢,實際上並沒有做為 Lookup Source 在處理,就是個 InputFormatSource 一次性把 mysql 的數據讀完,mysql source 就退出了(難道是姿勢不對,沒有走到 Lookup Source )

看起來和一般的join 和 一般的 join 的效果看起來並沒有什么不一樣的(user_log 表需要去掉 事件事件屬性),維表都是一次性讀取,然后 finish

INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
SELECT a.user_id, a.item_id, a.category_id, a.behavior, b.behavior_map, a.ts
FROM user_log a
  left join mysql_behavior_conf b on a.behavior = b.id
where a.behavior is not null;

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM