Java應用程序遠程提交FLink任務


1 解決問題

解決了flink任務提交依賴傳統Jar提交的問題,改為Java應用程序獲取RemoteEnvironment方式提交,便於維護管理等。

通過次提交方式,可以做進一步的延伸,通過Flink版本管理,Sql管理。只需要簡單的存儲版本信息,某個任務的Sql信息,就能快速實現任務提交,以此來摒棄傳統的Jar任務提交。進一步來講,Flink越來越重視FlinkSql,從Flink的更新,以及維護來看,Flink的未來將着重於SQL,以高級的SQL API取締其他API。所以,總的來說Flink Sql有無限前景。

2 測試用例

測試從kafka消費數據,保存到Mysql,此測試用例不涉及任何業務,且無實際意義,只是為了實現JavaAPI提交。

當然,你也可以實現Mysql -> Mysql

3 代碼實現

StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("gcw1", 8081);
        StreamTableEnvironment stEnv = StreamTableEnvironment.create(env);
        String kafkaFK = "CREATE TABLE test_fk (    " +
                "  `id` BIGINT,    " +
                "  `num` INT,    " +
                "   `ts` TIMESTAMP(3) METADATA FROM 'timestamp'    " +
                ") WITH (    " +
                "  'connector' = 'kafka',    " +
                "  'topic' = 'TEST_FK',    " +
                "  'properties.bootstrap.servers' = 'gcw1:9092',    " +
                "  'scan.startup.mode' = 'earliest-offset',    " +
                "  'format' = 'csv'    " +
                ")";
        String mysqlFK = " CREATE TABLE test_demo (  " +
                "  id BIGINT,  " +
                "  ct_num BIGINT,  " +
                "  submit_time  TIMESTAMP(3) ,  " +
                "  PRIMARY KEY (id) NOT ENFORCED  " +
                ") WITH (  " +
                "   'connector' = 'jdbc',  " +
                "   'url' = 'jdbc:mysql://gcw3:3306/test',  " +
                "   'table-name' = 'test_demo',  " +
                "   'username' = 'root',  " +
                "   'password' = '123456'  " +
                ")";
        stEnv.executeSql(kafkaFK);
        stEnv.executeSql(mysqlFK);
        TableResult tableResult = stEnv.executeSql("insert into test_demo select id,sum(num),max(ts) from test_fk group by id");

        //獲取任務id
        Optional<JobClient> jobClient = tableResult.getJobClient();
        JobClient jobClient1 = jobClient.get();
        JobID jobID = jobClient1.getJobID();
        System.out.println(jobID);

4 實現演示

演示用到了kafka集群,flink的standalone模式,請確保flink節點中kafka可以使用

flink-sb-1

5 實現過程中遇到的問題

5.1 版本問題

  • 確保應用程序與Flink集群版本一致,否則可能會有問題 😆
  • kafka的連接器要選擇sql-connector的
  • 確保你的pom文件合適,下面提供了項目地址,你可查看我的pom

5.2 Flink集群節點問題

  • 確保每個Flink的lib下有你需要的連接器,連接驅動等
  • 確保Flink每個節點能使用Kafka(如果你不是用kafka可以略過)

6 代碼地址

gitee地址

7 有任何問題歡迎留言討論


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM