Flink 實踐教程-入門(9):Jar 作業開發


​作者:騰訊雲流計算 Oceanus 團隊

 

流計算 Oceanus 簡介

流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。
Flink Jar 作業既支持使用 DataStream API 編程也支持使用 Table API/SQL 編程, Table API 和 SQL 也可以很容易地集成並嵌入到 DataStream 程序中,請參見 與 DataStream API 集成 [1] 章節了解如何將 DataStream 與 Table 之間的相互轉化。流計算 Oceanus 支持 Flink Jar 作業和 Flink SQL 作業,本文將向您詳細介紹如何使用 Flink DataStream API 進行 Jar 作業開發,並在流計算 Oceanus 平台運行。

 

操作視頻

前置准備

創建流計算 Oceanus 集群

進入流計算 Oceanus 控制台 [2],點擊左側【集群管理】,點擊左上方【創建集群】,具體可參考流計算 Oceanus 官方文檔 創建獨享集群 [3]。

創建消息隊列 CKafka

進入 CKafka 控制台 [4],點擊左上角【新建】,即可完成 CKafka 的創建,具體可參考 CKafka 創建實例 [5]。

創建 Topic:

進入 CKafka 實例,點擊【topic 管理】>【新建】,即可完成 Topic 的創建,具體可參考 CKafka 創建 Topic [6]。

開發 DataStream 作業

1. 新建 Maven 工程

在本地 IDEA 中新建Maven Project,並配置 pom.xml 文件。pom.xml 文件內容如下:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.oceanus</groupId> <artifactId>jar_demos</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!-- Oceanus 平台自帶了 flink-java、flink-streaming 等依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <!-- 使用 Oceanus 內置 Kafka Connector --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <!-- test --> <!-- flink-clients 用於本地調試 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.13.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.2.0</version> <configuration> <!-- 設置主類 --> <archive> <manifestEntries> <Main-Class>com.demos.HelloWorld</Main-Class> </manifestEntries> </archive> </configuration> </plugin> </plugins> </build></project>

2. 代碼編寫

Flink DataStream 作業代碼如下:

package com.demos;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.util.Collector;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class HelloWorld { public static void main(String[] args) throws Exception { // 1. 設置運行環境 StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); List<Integer> data = new ArrayList<>(); for (int i = 0; i < 100; i++) { data.add(i); } // 2. 配置數據源讀取數據 // 預定義數據源支持從文件、套接字、集合讀入數據;自定義數據源支持 Kafka、MySQL 等使用 addSource() 函數讀入數據 DataStreamSource<List<Integer>> dataStream = sEnv.fromElements(data); // 3. 數據加工 DataStream ds = dataStream.flatMap(new FlatMapFunction<List<Integer>, String>() { @Override public void flatMap(List<Integer> value, Collector<String> out) throws Exception { value.forEach(v -> out.collect(v.toString())); } }); // 4. 數據輸出 // 預定義目的端支持把數據寫入文件、標准輸出(stdout)、標准錯誤輸出(stderr)和 socket;自定義目的端支持 Kafka、MySQL 等使用 addSink() 函數寫出數據 Properties sinkProps = new Properties(); String hosts = "10.0.0.29:9092"; sinkProps.setProperty("bootstrap.servers", hosts); String outTopic = "flink-demo9"; FlinkKafkaProducer<String> producer = new FlinkKafkaProducer(outTopic, new SimpleStringSchema(), sinkProps); ds.addSink(producer); // ds.print(); // 5. 執行程序 sEnv.execute("helloworld"); }}

 

打包 Jar 包

使用 IDEA 自帶打包工具 Build Artifacts 或者命令行進行打包。命令行打包命令:

mvn clean package

命令行打包后生成的 Jar 包可以在項目 target 目錄下找到,Jar 名為 jar_demos-1.0-SNAPSHOT.jar。 

流計算 Oceanus 作業

1. 上傳依賴

在流計算 Oceanus 控制台,點擊左側【依賴管理】,點擊左上角【新建】新建依賴,上傳本地 Jar 包。

2. 創建作業

在流計算 Oceanus 控制台,點擊左側【作業管理】,點擊左上角【新建】新建作業,作業類型選擇 Jar 作業,點擊【開發調試】進入作業編輯頁面。【主程序包】選擇剛剛上傳的依賴,並選擇最新版本。參考 pom.xml 文件填寫主類,此處填入 com.demos.HelloWorld。

3. 運行作業

點擊【發布草稿】即可運行,可通過【日志】面板 TaskManager 或 Flink UI 查看運行信息。

總結

  1. DataStream 作業支持各類異構數據源與數據目的端。自定義數據源支持 Kafka、MySQL 等,使用 addSource() 函數讀入數據;自定義目的端支持 Kafka、MySQL 等,使用 addSink() 函數寫出數據。

  2. 打包時無需打包 flink 核心依賴,流計算 Oceanus 平台已提供。

 

閱讀參考

[1] 與 DataStream API 集成:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/data_stream_api/  

[2]流計算 Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview  

[3] 創建獨享集群:https://cloud.tencent.com/document/product/849/48298  

[4] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1  

[5] CKafka 創建實例:https://cloud.tencent.com/document/product/597/54839  

[6] Ckafka 創建 Topic:https://cloud.tencent.com/document/product/597/54854  

流計算 Oceanus 限量秒殺專享活動火爆進行中↓↓

 

​​在這里插入圖片描述

在這里插入圖片描述

 

關注“騰訊雲大數據”公眾號,技術交流、最新活動、服務專享一站Get~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM