1. Flink、Storm、Sparkstreaming對比
storm------ ---------At-least-once----Record Acks-------無狀態管理-------- 低延遲-----------高吞吐sparkstreaming-----Exactly-once-------RDD Checkpoint-----基於DStream-----中等延遲-----高吞吐Flink-----------------Exactly-once------ Checkpoint-----------基於操作----------低延遲-------高吞吐
2. Flink DataStream API基本算子
3. Flink On Yarn
- 提高集群機器的利用率;
- 一套集群,可以同時執行MR任務,Spark任務,Flink任務。
- 一開始在Yarn上初始化一個集群;yarn-session.sh【開辟資源】+flink run【提交任務】
- 每個Flink job都申請一個集群,互不影響,任務執行之后資源會被釋放掉。flink run -m yarn-cluster【開辟資源+提交任務】
4. FlinkKafkaConnector(FlinkKafkaConsumer+FlinkKafkaProducer)
閱讀過FlinkKafkaConnector源碼后對Kafka偏移量的存儲機制有了一個全新的認識,這里有兩個坑,簡單記下:
- FlinkKafkaComsumer08用的Kafka老版本Consumer,偏移量提交Zookeeper,由Zookeeper保管,而FlinkKafkaComsumer09以后,偏移量默認存在Kafka內部的Topic中,不再向Zookeeper提交;
- 使用FlinkKafkaProducer08寫Kafka1.10(其他高版本沒試過)存在超時。
5. DataStream API之partition
- 隨機分區
1 dataStream.shuffle();
- 重分區,消除數據傾斜
1 dataStream.rebalance();
- 自定義分區
1 dataStream.partitionCustom(partiitoner,"somekey");
- 廣播分區:把元素廣播所有分區,會被重復消費
1 DataStream.broadcast();
6. Flink Distributed Cache(分布式緩存)
- 原理
- 用法
用法一:注冊一個文件
1 env.registerCachedFile(“hdfs:///path/to/yout/file","hdfsfile");
用法二:訪問數據
1 File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
7. State
8. Checkpoint
1 env.enableCheckpointing(1000); //每隔1000ms設置一個檢查點【檢查點周期】 2 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//設置模式為exactly once(默認值) 3 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//檢查點之間至少有500ms的間隔【檢查點最小間隔】 4 env.getCheckpointConfig().setCheckpointTimeout(60000); //檢查點必須在一分鍾之內完成,否則丟棄【檢查點的超時時間】 5 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //同一時間點只允許一個檢查點 6 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
7 //Flink程序被取消后,會保留檢查點數據
9. Flink重啟策略
10. Window(窗口)
- 滾動窗口【沒有重疊】tumbling windows
1 .timeWindow(Time.minutes(1)); 2 .countWindow(100);
- 滑動窗口【有重疊】sliding windows
1 .timeWindow(Time.minutes(1),Time.seconds(30))//每隔30秒,統計一分鍾的數據 2 .countWindow(100,10)//每隔10條數據,統計100條的數據
- 會話窗口 session windows
- 增量聚合(窗口中每進入一條數據,就進行一次計算),例如reduce、aggregate、sum、min、max;
- 全量聚合(等窗口所有數據到齊,才開始計算,可以進行排序等)例如apply、process等
當然也可以分為keyed window和non-keyed window,分組的stream調用keyBy(...)和window(...),非分組的stream中window(...)換成了windowAll(...)
11. Time
- Event Time:日志產生的時間;
- Ingestion Time:事件從kafka等取出來,進入Flink的時間;
- Processing Time:事件被處理的時間,例如達到窗口處理的時間等。【默認】
12. Flink並行度
- Flink中每個TaskManager為集群提供slot,slot數量與每個節點的可用CPU核數成比例,slot上啟動進程,進程內有多個線程。如果任務管理器有n個槽,它會為每個槽分配 1/n 的內存,這里沒有對 CPU 進行隔離;目前任務槽僅僅用於划分任務的內存。
配置一個TaskManager有多少個並發的slot數有兩種配置方式:
- taskmanager.numberOfTaskSlots。在conf/flink-conf.yaml中更改,默認值為1,表示默認一個TaskManager只有1個task slot.
- 提交作業時通過參數配置。--yarnslots 1,表示TaskManager的slot數為1.
⚠️注意:slot不能搞太多,幾十個就行,你想啊假如你機器不多,TaskManager不多,搞那么多slot,每個slot分到的內存小的可憐,容易OOM啊
- 並行度的設置有多個地方:操作算子層面、執行環境層面、客戶端層面、系統層面,具體可以參考:FLINK並行度;
1 val wordCounts = text 2 .flatMap{ _.split(" ") map { (_, 1) } } 3 .keyBy(0) 4 .timeWindow(Time.seconds(5)) 5 .sum(1).setParallelism(5)
執行環境層面比如:
1 env.setParallelism(5)
提交任務的時候,在客戶端側flink可以通過-p參數來設置並行度。例如:
1 ./bin/flink run -p 5 ../examples/*WordCount-java*.jar
13. TaskManager數量
14. Flink傳遞參數給函數
-
使用構造函數方式
1 DataSet toFilter = env.fromElements(1, 2, 3); 2 toFilter.filter(new MyFilter(2)); 3 private static class MyFilter implements FilterFunction { 4 private final int limit; 5 public MyFilter(int limit) { 6 this.limit = limit; 7 } 8 @Override 9 public boolean filter(Integer value) throws Exception { 10 return value > limit; 11 } 12 }
1 DataSet toFilter = env.fromElements(1, 2, 3); 2 Configuration config = new Configuration(); 3 config.setInteger("limit", 2); 4 toFilter.filter(new RichFilterFunction() { 5 private int limit; 6 @Override 7 public void open(Configuration parameters) throws Exception { 8 limit = parameters.getInteger("limit", 0); 9 } 10 @Override 11 public boolean filter(Integer value) throws Exception { 12 return value > limit; 13 } 14 }).withParameters(config);
1 Configuration conf = new Configuration(); 2 conf.setString("mykey","myvalue"); 3 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 4 env.getConfig().setGlobalJobParameters(conf); 5 public static final class Tokenizer extends RichFlatMapFunction> { 6 private String mykey; 7 @Override 8 public void open(Configuration parameters) throws Exception { 9 super.open(parameters); 10 ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); 11 Configuration globConf = (Configuration) globalParams; 12 mykey = globConf.getString("mykey", null); 13 } 14 // ... more here ...
15. Flink中的Metrics
1 import org.apache.flink.api.common.accumulators.LongCounter; 2 import org.apache.flink.api.common.functions.RichMapFunction; 3 import org.apache.flink.api.java.utils.ParameterTool; 4 import org.apache.flink.configuration.Configuration; 5 import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper; 6 import org.apache.flink.metrics.Counter; 7 import org.apache.flink.metrics.Meter; 8 import org.apache.flink.metrics.MetricGroup; 9 10 11 public class Map extends RichMapFunction<String, String> { 12 private transient Meter logTotalMeter; 13 private transient Counter logTotalConter; 14 private LongCounter logAcc = new LongCounter(); 15 16 @Override 17 public void open(Configuration parameters) throws Exception { 18 MetricGroup metricGroup = getRuntimeContext().getMetricGroup(); 19 logTotalMeter = metricGroup.meter("logTotalMeter", 20 new DropwizardMeterWrapper(new com.codahale.metrics.Meter())); 21 logTotalConter = metricGroup.counter("logTotalConter"); 22 getRuntimeContext().addAccumulator("logAcc", this.logAcc); 23 } 24 25 26 @Override 27 public String map(String log) throws Exception { 28 logTotalConter.inc(); 29 logTotalMeter.markEvent(); 30 logAcc.add(NumberUtil.LONG_ONE); 31 logMeter.mark(); 32 return log; 33 } 34 }
16. Flink中的table API
聲明式-用戶只關心做什么,不用關心怎么做;高性能-支持查詢優化,可以獲取更好的執行性能;流批統一-相同的統計邏輯,既可以流模式也可以批模式;標准穩定-語義遵循SQL標准,不易變動易理解-所見即所得
1 // table API 2 tab.groupBy("word").select("word,count(1) as cnt") 3 4 // SQL 5 SELECT COUNT(1) AS cnt 6 FROM tab 7 GROUP BY word
總的來說,SQL有的功能table API都有,如下圖所示

三種注冊表的方式
三種發射表的方式
table API對列操作比較方便,比如
再比如有一張100列的表,選擇1到10列怎么操作?
另外,table API的map函數擴展起來也很方便
也可參考:Table API & SQL、Flink SQL-Client。
17. Flink中的內存管理
1 ./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar
紅色參數的具體含義是(參見Flink官網):
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
先上兩個小圖(這倆圖我沒找到原版出處,一直覺得這玩意坑爹,大概率上這里的JVM Heap包括了on-heap和off-heap,on-heap和off-heap下邊有介紹)
看圖說話,TaskManager 的堆內存主要被分成了三個部分:
- Network Buffers: 一定數量的32KB大小的 buffer,主要用於數據的網絡傳輸。在 TaskManager 啟動的時候就會分配。默認數量是 2048 個,可以通過
taskmanager.network.numberOfBuffers
來配置。 - Memory Manager Pool: 這是一個由
MemoryManager
管理的,由眾多MemorySegment
組成的超大集合。Flink 中的算法(如 sort/shuffle/join)會向這個內存池申請 MemorySegment,將序列化后的數據存於其中,使用完后釋放回內存池。默認情況下,池子占了堆內存的 70% 的大小。 - Remaining (Free) Heap: 這部分的內存是留給用戶代碼以及 TaskManager 的數據結構使用的。因為這些數據結構一般都很小,所以基本上這些內存都是給用戶代碼使用的。從GC的角度來看,可以把這里看成的新生代,也就是說這里主要都是由用戶代碼生成的短期對象。
⚠️注意:Memory Manager Pool 主要在Batch模式下使用。在Steaming模式下,該池子不會預分配內存,也不會向該池子請求內存塊。也就是說該部分的內存都是可以給用戶代碼使用的。不過社區是打算在 Streaming 模式下也能將該池子利用起來。
從堆的角度來說,Flink當前的內存支持堆內(on-heap)和堆外(off-heap)管理,用戶想去申請什么類型的內存,有相關的參數去配置。Flink off-heap的內存管理相對於on-heap的優點主要在於:
- 啟動分配了大內存(例如100G)的JVM很耗費時間,垃圾回收也很慢。如果采用off-heap,剩下的Network buffer和Remaining heap都會很小,垃圾回收也不用考慮MemorySegment中的Java對象了,節省了GC時間;
- 有效防止OOM,MemorySegment大小固定,操作高效。如果MemorySegment不足寫到磁盤,內存中的數據不多,一般不會發生OOM;
- 更有效率的IO操作。在off-heap下,將MemorySegment寫到磁盤或是網絡可以支持zeor-copy技術,而on-heap的話則至少需要一次內存拷貝;
- off-heap上的數據可以和其他程序共享。
好了概念講了一通,看下任務TaskManager日志
1 2019-07-23 07:27:50,035 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - Starting YARN TaskExecutor runner (Version: 1.7.1, Rev:<unknown>, Date:<unknown>) 2 2019-07-23 07:27:50,035 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - OS current user: yarn 3 2019-07-23 07:27:50,436 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - Current Hadoop/Kerberos user: worker 4 2019-07-23 07:27:50,436 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.112-b15 5 2019-07-23 07:27:50,436 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - Maximum heap size: 345 MiBytes 6 2019-07-23 07:27:50,437 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - JAVA_HOME: /usr/local/jdk/ 7 2019-07-23 07:27:50,438 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - Hadoop version: 2.6.0-cdh5.5.0 8 2019-07-23 07:27:50,438 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - JVM Options: 9 2019-07-23 07:27:50,438 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - -Xms360m 10 2019-07-23 07:27:50,439 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - -Xmx360m 11 2019-07-23 07:27:50,439 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - -XX:MaxDirectMemorySize=664m
紅色的地方有幾個值,Flink源碼中這幾個值的計算在TaskManagerServices.calculateHeapSizeMB(計算堆內內存大小)和calculateNetworkBufferMemory(計算堆外內存大小),下邊我們來看默認情況下這些值怎么計算的(僅限Flink 1.7.1版本,其他版本系數可能有變化)。
1. JVM預留內存,總內存的25%,最小預留,600M:
1024MB - (1024 * 0.25 < 600MB) -> 600MB = 424MB (cutoff)
2. 剩下的內存的10%作為networkBuffer的內存,最小64M:
424MB - (424MB * 0.1 < 64MB) -> 64MB(networkbuffer) = 360MB
3. 批作業的話剩下內存30%設為堆內內存,總內存減去堆內內存設為directMemory,流作業全部都是堆內內存了,用於netty和rocksDB和networkBuffer以及JVM自身內存。
這樣360M和664M都明了了,那為啥一開始日志打印“Maximum heap size: 345 MiBytes”呢,這里還有一個小坑,先看Flink怎么計算的,
1 /** 2 * The maximum JVM heap size, in bytes. 3 * 4 * <p>This method uses the <i>-Xmx</i> value of the JVM, if set. If not set, it returns (as 5 * a heuristic) 1/4th of the physical memory size. 6 * 7 * @return The maximum JVM heap size, in bytes. 8 */ 9 public static long getMaxJvmHeapMemory() { 10 final long maxMemory = Runtime.getRuntime().maxMemory(); 11 if (maxMemory != Long.MAX_VALUE) { 12 // we have the proper max memory 13 return maxMemory; 14 } else { 15 // max JVM heap size is not set - use the heuristic to use 1/4th of the physical memory 16 final long physicalMemory = Hardware.getSizeOfPhysicalMemory(); 17 if (physicalMemory != -1) { 18 // got proper value for physical memory 19 return physicalMemory / 4; 20 } else { 21 throw new RuntimeException("Could not determine the amount of free memory.\n" + 22 "Please set the maximum memory for the JVM, e.g. -Xmx512M for 512 megabytes."); 23 } 24 } 25 }
它用了Java lang的方法,這就有問題了,問題在於JVM使用的GC算法都會有一些內存丟失,比如Survivor有兩個,但只有1個會用到,另一個一直閑置,總有一塊Survivor區是不被計算到可用內存中的。
到底是不是呢,我在Flink程序的Map函數中加了這么一段代碼:
1 logger.info("Runtime max: " + mb(Runtime.getRuntime().maxMemory())); 2 MemoryMXBean m = ManagementFactory.getMemoryMXBean(); 3 4 logger.info("Non-heap: " + mb(m.getNonHeapMemoryUsage().getMax())); 5 logger.info("Heap: " + mb(m.getHeapMemoryUsage().getMax())); 6 7 for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) { 8 logger.info("Pool: " + mp.getName() + " (type " + mp.getType() + ")" + " = " + mb(mp.getUsage().getMax())); 9 }
打印日志出來
1 2019-07-20 09:03:16,344 INFO data.demo.core.example - Heap: 361758720 (345.00 M) 2 2019-07-20 09:03:16,344 INFO data.demo.core.example - Pool: PS Survivor Space (type Heap memory) = 15728640 (15.00 M) 3 2019-07-20 09:03:16,344 INFO data.demo.core.example - Pool: Code Cache (type Non-heap memory) = 251658240 (240.00 M) 4 2019-07-20 09:03:16,344 INFO data.demo.core.example - Pool: PS Old Gen (type Heap memory) = 251658240 (240.00 M) 5 2019-07-20 09:03:16,344 INFO data.demo.core.example - Pool: Metaspace (type Non-heap memory) = -1 (-0.00 M) 6 2019-07-20 09:03:16,344 INFO data.demo.core.example - Pool: Compressed Class Space (type Non-heap memory) = 1073741824 (1024.00 M) 7 2019-07-20 09:03:16,344 INFO data.demo.core.example - Pool: PS Eden Space (type Heap memory) = 94371840 (90.00 M) 8 2019-07-20 09:03:16,344 INFO data.demo.core.example - Pool: PS Survivor Space (type Heap memory) = 15728640 (15.00 M) 9 2019-07-20 09:03:16,345 INFO data.demo.core.example - Pool: PS Old Gen (type Heap memory) = 251658240 (240.00 M) 10 2019-07-20 09:03:16,345 INFO data.demo.core.example - Runtime max: 361758720 (345.00 M)
眼見為實,實錘了。
再看Flink TaskManager WebUI
有一些概念還要說下,比如JVM管理兩種類型的內存:堆(heap)和非堆(Nonheap),堆就是Java代碼可及的內存,所有類實例和數組的內存都是在堆上分配。非堆就是JVM留給自己用的,方法區、棧、每個類結構(如運行時常數池、字段和方法數據)以及方法和構造方法的代碼都在非堆內存中。
再比如Direct和Mapped,這倆是JVM緩沖池,主要是JNI使用。
UI上的這些參數我們都可以通過Flink的Rest API拿到下面這些指標值,比如通過訪問本地localhost:23799/taskmanagers/container_e37_1563420494990_0370_01_000009(具體參見Flink官網):
Scope | Infix | Metrics | Description |
---|---|---|---|
Job-/TaskManager | Status.JVM.Memory | Memory.Heap.Used | 當前使用的堆內存大小. |
Heap.Committed | 保證JVM可用的堆內存大小. | ||
Heap.Max | 可用於內存管理的堆內存最大值. | ||
NonHeap.Used | 當前使用的非堆內存大小. | ||
NonHeap.Committed | 保證JVM可用的非堆內存大小. | ||
NonHeap.Max | 可用於內存管理的非堆內存最大值. | ||
Direct.Count | 直接緩沖池中的緩沖區數量. | ||
Direct.MemoryUsed | JVM中用於直接緩沖池的內存大小. | ||
Direct.TotalCapacity | 直接緩沖池中所有緩沖區的總容量. | ||
Mapped.Count | 映射緩沖池中緩沖區的數量. | ||
Mapped.MemoryUsed | JVM中用於映射緩沖池的內存大小. | ||
Mapped.TotalCapacity | 映射緩沖池中緩沖區的數量. |
18. Flink中依賴配置
- Flink核心依賴:Flink本身由運行系統所需的一組類和依賴項組成,例如協調,網絡,檢查點,故障轉移,API,操作(如窗口),資源管理等。所有這些這些類和依賴項構成了Flink運行時的核心,在啟動Flink應用程序時必須存在。這些核心類和依賴項打包在flink-dist.jar中。它們是Flink lib文件夾的一部分。這部分不包含任何連接器或庫(CEP,SQL,ML等),以避免默認情況下在類路徑中具有過多的依賴項和類,保持默認的類路徑較小並避免依賴性沖突。Maven(和其他構建工具)將依賴項打包時一般將核心依賴設為provided,如果它們未設置為provided,可能使生成的JAR包過大,還可能出現添加到應用程序的jar文件的Flink核心依賴項與用戶自己的一些依賴版本沖突(可以通過反向類加載來避免);如果在IntelliJ IDEA中調試,則將scope設置為comiple,否則失敗報
NoClassDefFountError錯;
- 用戶應用程序依賴: connectors, formats, or libraries(CEP, SQL, ML),用戶應用程序通常打包到應用程序jar中。
另外,當Flink程序讀寫HDFS時需要添加Hadoop依賴,不要把Hadoop依賴直接添加到Flink application,而是: export HADOOP_CLASSPATH=`hadoop classpath`,Flink組件啟動時會使用該環境變量,這樣做是因為:
- 一些Hadoop交互發生在Flink的核心,可能在用戶應用程序啟動之前,例如為檢查點設置HDFS,通過Hadoop的Kerberos令牌進行身份驗證或在YARN上部署。
- Flink的反向類加載方法隱藏了核心依賴關系中的許多傳遞依賴關系,應用程序可以使用相同依賴項的不同版本,而不會遇到依賴項沖突。
19. Flink單元測試指南
20. Flink讀HDFS
1 DataSet<String> hdfslines=env.readTextFile("your hdfs path")
寫數據
1 hdfslines.writeAsText("your hdfs path")
以上會根據你的默認的線程數來生成多少個分區文件,如果你想最后生成一個文件的話,可以在后面使用setParallelism(1),這樣最后就只會生成一個文件了。具體可以這么整

1 try { 2 String topic = args[0]; 3 String path = args[1]; 4 //讀取配置文件 5 Configuration conf = new Configuration(); 6 //獲取文件系統 7 FileSystem fs = FileSystem.get(URI.create("/"), conf); 8 9 System.out.println(fs.getUri()); 10 if (!fs.getUri().toString().contains("hdfs")) { 11 path = "hdfs://localhost:8020" + path; 12 } 13 14 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 15 DataSet<String> union = env.readTextFile(fs.getUri() + path + "/").setParallelism(20); 16 17 union.rebalance().setParallelism(15).map(new MapFunction<String, String>() { 18 private static final long serialVersionUID = 1033071381217373267L; 19 20 @Override 21 public String map(String rawLog) throws Exception { 22 return rawLog; 23 } 24 }).output(new DiscardingOutputFormat<>()) 25 .setParallelism(20); 26 try { 27 env.execute(); 28 } catch (Exception e) { 29 e.printStackTrace(); 30 } 31 32 //關閉文件系統 33 fs.close(); 34 } catch (Exception e) { 35 logger.error("task submit process error, due to {}.", e.getMessage()); 36 e.printStackTrace(); 37 }
path是傳入的目錄,比如/user/rawlog/hourly/2019-09-15/09
配置