一、Flink概述 官網:https://flink.apache.org/ mapreduce-->maxcompute HBase-->部門 quickBI DataV Hive-->高德地圖 Storm-->Jstorm ...... 2019年1月,阿里正式開源flink-->blink Apache Flink是一個框架和分布式處理引擎,用於對無界和有界數據流進行有狀態計算。 Flink設 計為在所有常見的集群環境中運行,以內存速度和任何規模執行計算。 大數據計算框架 二、Flink特點 1)mapreduce 2)storm 3)spark 適用於所有企業,不同企業有不同的業務場景。處理數據量,模型都不一樣。 1)隨機部署應用 以其他組件集成! flink是分布式系統,需要計算資源才可執行程序。flink可以與常見的集群資源管理器進行集成(Hadoop Yarn,Apache Mesos...)。 可以單獨作為獨立集群運行。 通過不同部署模式實現。 這些模式允許flink以其慣有的方式進行交互。 當我們部署flink應用程序時,Flink會根據應用程序配置的並行性自動識別所需資源。從資源管理器中請求它們。 如果發生故障,flink會請求新的資源來替換發生故障的容器。 提交或控制程序都通過REST調用進行,簡化Flink在許多環境的集成。孵化... 2)以任何比例應用程序(小集群、無限集群) Flink旨在以任何規模運行有狀態流應用程序。應用程序可以並行化在集群中分布和同時執行程序。 因此,我們的應用集群可以利用無限的cpu和磁盤與網絡IO。 Flink可以輕松的維護非常大的應用程序狀態。 用戶可拓展性報告: 1)應用程序每天可以處理萬億個事件 2)應用程序每天可以維護多個TB的狀態 3)應用程序可以在數千個內核運行 3)利用內存中的性能 有狀態Flink應用程序針對於對本地狀態訪問進行了優化。任務狀態始終的保留在內存中,或者如果 大小超過了可用內存,則保存在訪問高效的磁盤數據結構中(SSD 機械/固態)。 任務可以通過訪問本地來執行所有計算。從來產生極小的延遲。 Flink定期和異步檢查本地狀態持久存儲來保持出現故障時一次狀態的一致性。 三、有界無界 1)無界 有開始,沒有結束... 處理實時數據。 2)有界 有開始,有結束... 處理批量數據。 四、無界數據集應用場景(實時計算) 1)源源不斷的日志數據 2)web應用,指標分析 3)移動設備終端(分析app狀況) 4)應用在任何數據源不斷產生的項目中 五、Flink運行模型 1)流計算 數據源源不斷產生,我們的需求是源源不斷的處理。程序需要一直保持在計算的狀態。 2)批處理 計算一段完整的數據集,計算成功后釋放資源,那么此時工作結束。 六、Flink的使用 1)處理結果准確:無論是有序數據還是延遲到達的數據。 2)容錯機制: 有狀態:保持每次的結果往下傳遞,實現累加。DAG(有向無環圖)。 3)有很強大的吞吐量和低延遲。 計算速度快,吞吐量處理的量級大。 4)精准的維護一次的應用狀態。 storm:會發生要么多計算一次,要么漏計算。 5)支持大規模的計算 可以運行在數千台節點上。 6)支持流處理和窗口化操作 7)版本化處理 8)檢查點機制實現精准的一次性計算保證 checkpoint 9)支持yarn與mesos資源管理器 七、flink單節點安裝部署 1)下載安裝包 https://archive.apache.org/dist/flink/flink-1.6.2/flink-1.6.2-bin-hadoop28-scala_2.11.tgz 2)上傳安裝包到/root下 3)解壓 cd /root tar -zxvf flink-1.6.2-bin-hadoop28-scala_2.11.tgz -C hd 4)啟動 cd /root/hd/flink-1.6.2 bin/start-cluster.sh 5)啟動 cd /root/hd/flink-1.6.2 bin/stop-cluster.sh 6)訪問ui界面 http://192.168.146.132:8081 八、flink集群安裝部署 1)下載安裝包 https://archive.apache.org/dist/flink/flink-1.6.2/flink-1.6.2-bin-hadoop28-scala_2.11.tgz 2)上傳安裝包到/root下 3)解壓 cd /root tar -zxvf flink-1.6.2-bin-hadoop28-scala_2.11.tgz -C hd 4)修改配置文件 vi flink-conf.yaml 第33行修改為: jobmanager.rpc.address: hd09-1 5)修改slaves vi slaves hd09-2 hd09-3 6)分發flink到其他機器 cd /root/hd scp -r flink-1.6.2/ hd09-2:$PWD scp -r flink-1.6.2/ hd09-3:$PWD 7)啟動集群 cd /root/hd/flink-1.6.2 bin/start-cluster.sh 8)關閉集群 cd /root/hd/flink-1.6.2 bin/stop-cluster.sh 9)訪問ui界面 http://192.168.146.132:8081
九、flink結構

十、WordCount簡單實現
需求:實時的wordcount
往端口中發送數據,實時的計算數據
1、SocketWordCount類
package com.demo.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * 需求:實時的wordcount * 往端口中發送數據,實時的計算數據 */ public class SocketWordCount { public static void main(String[] args) throws Exception { //1.定義連接端口 final int port = 9999; //2.創建執行環境對象 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //3.得到套接字對象(指定:主機、端口、分隔符) DataStreamSource<String> text = env.socketTextStream("192.168.146.132", port, "\n"); //4.解析數據,統計數據-單詞計數 hello lz hello world DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String s, Collector<WordWithCount> collector){ //按照空白符進行切割 for (String word : s.split("\\s")) { //<單詞,1> collector.collect(new WordWithCount(word, 1L)); } } }) //按照key進行分組 .keyBy("word") //設置窗口的時間長度 5秒一次窗口 1秒計算一次 .timeWindow(Time.seconds(5), Time.seconds(1)) //聚合,聚合函數 .reduce(new ReduceFunction<WordWithCount>() { public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { //按照key聚合 return new WordWithCount(a.word, a.count + b.count); } }); //5.打印可以設置並發度 windowCounts.print().setParallelism(1); //6.執行程序 env.execute("Socket window WordCount"); } public static class WordWithCount { public String word; public long count; public WordWithCount() { } public WordWithCount(String word, long count){ this.word = word; this.count = count; } public String toString(){ return word + " : " + count; } } }
2、flink的maven依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.2</version>
</dependency>
3、運行SocketWordCount類的main方法
4、服務器安裝netcat
// 安裝netcat
yum install -y nc
// 使用nc,其中9999是SocketWordCount類中定義的端口號
nc -lk -p 9999
5、此時在服務器的nc下輸入單詞后,SocketWordCount的main方法會時時監控到該單詞並進行計算處理。
6、也可以把SocketWordCount程序打成jar包放置到服務器上,執行
[root@hd09-1 flink-1.6.2]# bin/flink run -c com.demo.flink.SocketWordCount /root/FlinkTest-1.0-SNAPSHOT.jar
啟動WordCount計算程序,此時結果會寫到/root/hd/flink-1.6.2/log下的flink-root-taskexecutor-0-hd09-1.out文件中。
