步驟列表
本次實戰經歷以下步驟:
創建應用;
編碼;
構建;
提交任務到Flink,驗證功能;
環境信息
Flink:1.7;
Flink所在機器的操作系統:CentOS Linux release 7.5.1804;
開發環境JDK:1.8.0_181;
開發環境Maven:3.5.0;
應用功能簡介
在Flink運行SocketWindowWordCount.jar,實現的功能是從socket讀取字符串,將其中的每個單詞的數量統計出來,今天我們就來編碼開發這個應用,實現此功能;
1、應用基本代碼是通過mvn命令創建的,在命令行輸入以下命令
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
2、按控制台的提示輸入groupId、artifactId、version、package等信息,一路回車確認后,會生成一個和你輸入的artifactId同名的文件夾,里面是個maven工程:
Define value for property 'groupId': com.hjp Define value for property 'artifactId': socketwordcountdemo Define value for property 'version' 1.0-SNAPSHOT: : Define value for property 'package' com.hjp: : Confirm properties configuration: groupId: com.hjp artifactId: socketwordcountdemo version: 1.0-SNAPSHOT package: com.hjp
3、用IEDA導入這個maven工程,如下圖,已經有了兩個類:BatchJob和StreamingJob,BatchJob是用於批處理的,本次實戰用不上,因此可以刪除,只保留流處理的StreamingJob:

應用創建成功,接下來可以開始編碼了;
在StreamingJob類中添加靜態內部類WordWithCount,這是個PoJo,用來保存一個具體的單詞及其出現頻率:
/**
* 記錄單詞及其出現頻率的Pojo
*/
public static class WordWithCount {
/**
* 單詞內容
*/
public String word;
/**
* 出現頻率
*/
public long count;
public WordWithCount() {
super();
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
/**
* 將單詞內容和頻率展示出來
* @return
*/
@Override
public String toString() {
return word + " : " + count;
}
}
把所有業務邏輯寫在StreamJob類的main方法中,如下所示,關鍵位置都加了中文注釋:
public static void main(String[] args) throws Exception {
//環境信息
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//數據來源是本機9999端口,換行符分隔,您也可以考慮將hostname和port參數通過main方法的入參傳入
DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");
//通過text對象轉換得到新的DataStream對象,
//轉換邏輯是分隔每個字符串,取得的所有單詞都創建一個WordWithCount對象
DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {
for(String word : s.split("\\s")){
collector.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")//key為word字段
.timeWindow(Time.seconds(5)) //五秒一次的翻滾時間窗口
.reduce(new ReduceFunction<WordWithCount>() { //reduce策略
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
return new WordWithCount(a.word, a.count+b.count);
}
});
//單線程輸出結果
windowCounts.print().setParallelism(1);
// 執行
env.execute("Flink Streaming Java API Skeleton");
}
構建
maven 打包

在Flink驗證
1、登錄到Flink所在機器,執行以下命令:

2、我這邊Flink所在機器的IP地址是192.168.11.107,因此用瀏覽器訪問的Flink的web地址為:http://192.168.11.107:8081;
3、選擇剛剛生成的jar文件作為一個新的任務,如下圖:

4、點擊下圖紅框中的"upload",將文件提交:

5、目前還只是將jar文件上傳了而已,接下來就是手工設置執行類並啟動任務,操作如下圖,填寫前面編寫的StreamingJob類的完整名稱:

6、提交后的頁面效果如下圖所示,可見一個job已經在運行中了:

7、回到Flink所在機器的控制台,在之前輸入了nc -l 9999的窗口輸入一些英文句子,然后按下回車鍵,例如:

8、接下來看看我們的job的執行效果,如下圖,點擊左側的"Task Managers",在右邊的列表中只有一個Task,點擊它:

9、點擊"Stdout"這個tab,就能見到我們的任務對之前句子中的單詞的統計結果,如下圖:

至此,第一個比較簡單的FLINK就完成了。
