Flink Sql Client初探


注: 對應的sql_lib依賴jar,在參考博客的留言下面有

1、運行f'link sql 

1、首先進入flink目錄,啟動flink:bin/start-cluster.sh
2、其次啟動Flink SQL Client:bin/sql-client.sh embedded -l sql_lib

2、啟動界面

 3、測試demo

DROP TABLE IF  EXISTS  `user`;

CREATE TABLE `user` (
    id INT, 
    name STRING,
    create_time TIMESTAMP(3)
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/test',
    'connector.table' = 'user',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

CREATE TABLE user_info (
    id INT,
    username STRING,
    password STRING
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/test',
    'connector.table' = 'user_info',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

4、執行sql,查看結果

select u.id,u.name,u.create_time,ui.id,ui.username,ui.password  
from `user` as u 
left join user_info as ui 
on u.id = ui.id;

 

 ===============實現Datax效果========================

DROP TABLE IF  EXISTS  `user`;
CREATE TABLE `user` (
    id INT, 
    name STRING,
    create_time TIMESTAMP(3)
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/test',
    'connector.table' = 'user',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

DROP TABLE IF  EXISTS  `user_info`;
CREATE TABLE user_info (
    id INT,
    username STRING,
    password STRING
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/test',
    'connector.table' = 'user_info',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);


DROP TABLE IF  EXISTS  `user_all`;
CREATE TABLE user_all (
    id INT,
    name STRING,
    create_time TIMESTAMP(3),
    bid INT,
    username STRING,
    password STRING
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/test',
    'connector.table' = 'user_all',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);


insert into user_all
    select u.id,u.name,u.create_time,ui.id as bid,ui.username,ui.password  
    from `user` as u 
    left join user_info as ui 
    on u.id = ui.id group by u.id,u.name,u.create_time,ui.id,ui.username,ui.password  ;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM