注: 對應的sql_lib依賴jar,在參考博客的留言下面有
1、運行f'link sql
1、首先進入flink目錄,啟動flink:bin/start-cluster.sh
2、其次啟動Flink SQL Client:bin/sql-client.sh embedded -l sql_lib
2、啟動界面
3、測試demo
DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( id INT, name STRING, create_time TIMESTAMP(3) ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/test', 'connector.table' = 'user', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' ); CREATE TABLE user_info ( id INT, username STRING, password STRING ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/test', 'connector.table' = 'user_info', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' );
4、執行sql,查看結果
select u.id,u.name,u.create_time,ui.id,ui.username,ui.password
from `user` as u
left join user_info as ui
on u.id = ui.id;
===============實現Datax效果========================
DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( id INT, name STRING, create_time TIMESTAMP(3) ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/test', 'connector.table' = 'user', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' ); DROP TABLE IF EXISTS `user_info`; CREATE TABLE user_info ( id INT, username STRING, password STRING ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/test', 'connector.table' = 'user_info', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' ); DROP TABLE IF EXISTS `user_all`; CREATE TABLE user_all ( id INT, name STRING, create_time TIMESTAMP(3), bid INT, username STRING, password STRING ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/test', 'connector.table' = 'user_all', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' ); insert into user_all select u.id,u.name,u.create_time,ui.id as bid,ui.username,ui.password from `user` as u left join user_info as ui on u.id = ui.id group by u.id,u.name,u.create_time,ui.id,ui.username,ui.password ;