步驟列表
本次實戰經歷以下步驟:
創建應用;
編碼;
構建;
提交任務到Flink,驗證功能;
環境信息
Flink:1.7;
Flink所在機器的操作系統:CentOS Linux release 7.5.1804;
開發環境JDK:1.8.0_181;
開發環境Maven:3.5.0;
應用功能簡介
在Flink運行SocketWindowWordCount.jar,實現的功能是從socket讀取字符串,將其中的每個單詞的數量統計出來,今天我們就來編碼開發這個應用,實現此功能;
1、應用基本代碼是通過mvn命令創建的,在命令行輸入以下命令
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
2、按控制台的提示輸入groupId、artifactId、version、package等信息,一路回車確認后,會生成一個和你輸入的artifactId同名的文件夾,里面是個maven工程:
Define value for property 'groupId': com.hjp Define value for property 'artifactId': socketwordcountdemo Define value for property 'version' 1.0-SNAPSHOT: : Define value for property 'package' com.hjp: : Confirm properties configuration: groupId: com.hjp artifactId: socketwordcountdemo version: 1.0-SNAPSHOT package: com.hjp
3、用IEDA導入這個maven工程,如下圖,已經有了兩個類:BatchJob和StreamingJob,BatchJob是用於批處理的,本次實戰用不上,因此可以刪除,只保留流處理的StreamingJob:
應用創建成功,接下來可以開始編碼了;
在StreamingJob類中添加靜態內部類WordWithCount,這是個PoJo,用來保存一個具體的單詞及其出現頻率:
/** * 記錄單詞及其出現頻率的Pojo */ public static class WordWithCount { /** * 單詞內容 */ public String word; /** * 出現頻率 */ public long count; public WordWithCount() { super(); } public WordWithCount(String word, long count) { this.word = word; this.count = count; } /** * 將單詞內容和頻率展示出來 * @return */ @Override public String toString() { return word + " : " + count; } }
把所有業務邏輯寫在StreamJob類的main方法中,如下所示,關鍵位置都加了中文注釋:
public static void main(String[] args) throws Exception { //環境信息 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //數據來源是本機9999端口,換行符分隔,您也可以考慮將hostname和port參數通過main方法的入參傳入 DataStream<String> text = env.socketTextStream("localhost", 9999, "\n"); //通過text對象轉換得到新的DataStream對象, //轉換邏輯是分隔每個字符串,取得的所有單詞都創建一個WordWithCount對象 DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String s, Collector<WordWithCount> collector) throws Exception { for(String word : s.split("\\s")){ collector.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word")//key為word字段 .timeWindow(Time.seconds(5)) //五秒一次的翻滾時間窗口 .reduce(new ReduceFunction<WordWithCount>() { //reduce策略 @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word, a.count+b.count); } }); //單線程輸出結果 windowCounts.print().setParallelism(1); // 執行 env.execute("Flink Streaming Java API Skeleton"); }
構建
maven 打包
在Flink驗證
1、登錄到Flink所在機器,執行以下命令:
2、我這邊Flink所在機器的IP地址是192.168.11.107,因此用瀏覽器訪問的Flink的web地址為:http://192.168.11.107:8081;
3、選擇剛剛生成的jar文件作為一個新的任務,如下圖:
4、點擊下圖紅框中的"upload",將文件提交:
5、目前還只是將jar文件上傳了而已,接下來就是手工設置執行類並啟動任務,操作如下圖,填寫前面編寫的StreamingJob類的完整名稱:
6、提交后的頁面效果如下圖所示,可見一個job已經在運行中了:
7、回到Flink所在機器的控制台,在之前輸入了nc -l 9999的窗口輸入一些英文句子,然后按下回車鍵,例如:
8、接下來看看我們的job的執行效果,如下圖,點擊左側的"Task Managers",在右邊的列表中只有一個Task,點擊它:
9、點擊"Stdout"這個tab,就能見到我們的任務對之前句子中的單詞的統計結果,如下圖:
至此,第一個比較簡單的FLINK就完成了。