開發一個Flink應用


步驟列表
本次實戰經歷以下步驟:

創建應用;
編碼;
構建;
提交任務到Flink,驗證功能;

環境信息
Flink:1.7;
Flink所在機器的操作系統:CentOS Linux release 7.5.1804;
開發環境JDK:1.8.0_181;
開發環境Maven:3.5.0;

應用功能簡介

 在Flink運行SocketWindowWordCount.jar,實現的功能是從socket讀取字符串,將其中的每個單詞的數量統計出來,今天我們就來編碼開發這個應用,實現此功能;

1、應用基本代碼是通過mvn命令創建的,在命令行輸入以下命令

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0

2、按控制台的提示輸入groupId、artifactId、version、package等信息,一路回車確認后,會生成一個和你輸入的artifactId同名的文件夾,里面是個maven工程:

Define value for property 'groupId': com.hjp
Define value for property 'artifactId': socketwordcountdemo
Define value for property 'version' 1.0-SNAPSHOT: :
Define value for property 'package' com.hjp: :
Confirm properties configuration:
groupId: com.hjp
artifactId: socketwordcountdemo
version: 1.0-SNAPSHOT
package: com.hjp

3、用IEDA導入這個maven工程,如下圖,已經有了兩個類:BatchJob和StreamingJob,BatchJob是用於批處理的,本次實戰用不上,因此可以刪除,只保留流處理的StreamingJob:

應用創建成功,接下來可以開始編碼了;

在StreamingJob類中添加靜態內部類WordWithCount,這是個PoJo,用來保存一個具體的單詞及其出現頻率:

	 /**
	 * 記錄單詞及其出現頻率的Pojo
	 */
	public static class WordWithCount {
		/**
		 * 單詞內容
		 */
		public String word;

		/**
		 * 出現頻率
		 */
		public long count;

		public WordWithCount() {
			super();
		}

		public WordWithCount(String word, long count) {
			this.word = word;
			this.count = count;
		}

		/**
		 * 將單詞內容和頻率展示出來
		 * @return
		 */
		@Override
		public String toString() {
			return word + " : " + count;
		}
	}

把所有業務邏輯寫在StreamJob類的main方法中,如下所示,關鍵位置都加了中文注釋:

 

public static void main(String[] args) throws Exception {

		//環境信息
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		//數據來源是本機9999端口,換行符分隔,您也可以考慮將hostname和port參數通過main方法的入參傳入
		DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");

		//通過text對象轉換得到新的DataStream對象,
		//轉換邏輯是分隔每個字符串,取得的所有單詞都創建一個WordWithCount對象
		DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
			@Override
			public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {
				for(String word : s.split("\\s")){
					collector.collect(new WordWithCount(word, 1L));
				}
			}
		})
				.keyBy("word")//key為word字段
				.timeWindow(Time.seconds(5))	//五秒一次的翻滾時間窗口
				.reduce(new ReduceFunction<WordWithCount>() { //reduce策略
					@Override
					public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
						return new WordWithCount(a.word, a.count+b.count);
					}
				});


		//單線程輸出結果
		windowCounts.print().setParallelism(1);

		// 執行
		env.execute("Flink Streaming Java API Skeleton");
	}

 

構建

maven 打包

在Flink驗證

1、登錄到Flink所在機器,執行以下命令:

2、我這邊Flink所在機器的IP地址是192.168.11.107,因此用瀏覽器訪問的Flink的web地址為:http://192.168.11.107:8081;

3、選擇剛剛生成的jar文件作為一個新的任務,如下圖:

 

4、點擊下圖紅框中的"upload",將文件提交:

5、目前還只是將jar文件上傳了而已,接下來就是手工設置執行類並啟動任務,操作如下圖,填寫前面編寫的StreamingJob類的完整名稱:

6、提交后的頁面效果如下圖所示,可見一個job已經在運行中了:

7、回到Flink所在機器的控制台,在之前輸入了nc -l 9999的窗口輸入一些英文句子,然后按下回車鍵,例如:

8、接下來看看我們的job的執行效果,如下圖,點擊左側的"Task Managers",在右邊的列表中只有一個Task,點擊它:

9、點擊"Stdout"這個tab,就能見到我們的任務對之前句子中的單詞的統計結果,如下圖:

 

至此,第一個比較簡單的FLINK就完成了。

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM