1 配置內存
如果頻繁出現Full GC,需要優化GC
在客戶端的"conf/flink-conf.yaml"配置文件中,在“env.java.opts”配置項中添加參數:
-Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=20M
-
優化GC
調整老年代和新生代的比值。在客戶端的“conf/flink-conf.yaml”配置文件中,在“env.java.opts”配置項中添加參數:“-XX:NewRatio”。
如“ -XX:NewRatio=2”,則表示老年代與新生代的比值為2:1,新生代占整個堆空間的1/3,老年代占2/3。
-
開發Flink應用程序時,優化DataStream的數據分區或分組操作。
keyBy盡量不要使用String。
2 設置並行度
- 並行度控制任務的數量,影響操作后數據被切分成的塊數,調整並行度讓任務的數量和每個任務處理的數據與機器的處理能力達到最優。
- 查看CPU使用情況和內存占用情況,當任務和數據不是平均分布在各節點,而是集中在個別節點時,可以增大並行度使任務和數據更均勻的分布在各個節點。
- 增加任務的並行度,充分利用集群機器的計算能力,一般並行度設置為集群CPU核數總和的2-3倍。
並行度分為4個等級
Flink程序運行在執行環境中。執行環境為所有執行的算子、數據源、data sink定義了一個默認的並行度。
- 算子層次 : 跟在某個算子后面用setParallelism()指定
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
- 執行環境層次
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");
- 客戶端層次 '-p' 指令
./bin/flink run -p 5 ../xxxx/xxxxx.jar
- 系統層次
在客戶端conf目錄下的“flink-conf.yaml”文件中的“parallelism.default”配置選項來指定所有執行環境的默認並行度。
3.配置進程參數
- 配置JobManager內存
JobManager負責任務的調度,以及TaskManager、RM之間的消息通信。當任務數變多,任務平行度增大時,JobManager內存都需要相應增大。您可以根據實際任務數量的多少,為 JobManager設置一個合適的內存。
-
-
在使用yarn-session命令時,添加“-jm MEM”參數設置內存。
-
在使用yarn-cluster命令時,添加“-yjm MEM”參數設置內存。
-
- 配置TaskManager個數每個TaskManager每個核同時能跑一個task,所以增加了TaskManager的個數相當於增大了任務的並發度。在資源充足的情況下,可以相應增加TaskManager的個數,以提高運行效率。
-
在使用yarn-session命令時,添加“-n NUM”參數設置TaskManager個數。
-
在使用yarn-cluster命令時,添加“-yn NUM”參數設置TaskManager個數。
-
- 配置TaskManager Slot數每個TaskManager多個核同時能跑多個task,相當於增大了任務的並發度。但是由於所有核共用TaskManager的內存,所以要在內存和核數之間做好平衡。
-
在使用yarn-session命令時,添加“-s NUM”參數設置SLOT數。
-
在使用yarn-cluster命令時,添加“-ys NUM”參數設置SLOT數
-
- 配置TaskManager內存TaskManager的內存主要用於任務執行、通信等。當一個任務很大的時候,可能需要較多資源,因而內存也可以做相應的增加。
-
將在使用yarn-sesion命令時,添加“-tm MEM”參數設置內存。
-
將在使用yarn-cluster命令時,添加“-ytm MEM”參數設置內存。
-
4.設計分區方法
合理的設計分區依據,可以優化task的切分。在程序編寫過程中要盡量分區均勻,這樣可以實現每個task數據不傾斜,防止由於某個task的執行時間過長導致整個任務執行緩慢。
以下是幾種分區方法
-
隨機分區:將元素隨機地進行分區。dataStream.shuffle();
-
Rebalancing (Round-robin partitioning):基於round-robin對元素進行分區,使得每個分區負責均衡。對於存在數據傾斜的性能優化是很有用的。dataStream.rebalance();
-
Rescaling:以round-robin的形式將元素分區到下游操作的子集中。如果你想要將數據從一個源的每個並行實例中散發到一些mappers的子集中,用來分散負載,但是又不想要完全rebalance 介入(引入rebalance()),這會非常有用。dataStream.rescale();
-
廣播:廣播每個元素到所有分區。dataStream.broadcast();
-
自定義分區:使用一個用戶自定義的Partitioner對每一個元素選擇目標task,由於用戶對自己的數據更加熟悉,可以按照某個特征進行分區,從而優化任務執行。簡單示例如下所示:
// fromElements構造簡單的Tuple2流 DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("hello",1), Tuple2.of("test",2), Tuple2.of("world",100)); // 定義用於分區的key值,返回即屬於哪個partition的,該值加1就是對應的子任務的id號 Partitioner<Tuple2<String, Integer>> strPartitioner = new Partitioner<Tuple2<String, Integer>>() { @Override public int partition(Tuple2<String, Integer> key, int numPartitions) { return (key.f0.length() + key.f1) % numPartitions; } }; // 使用Tuple2進行分區的key值 dataStream.partitionCustom(strPartitioner, new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception { return value; } }).print();
5.配置netty網絡通信
Flink通信主要依賴netty網絡,所以在Flink應用執行過程中,netty的設置尤為重要,網絡通信的好壞直接決定着數據交換的速度以及任務執行的效率。
可在客戶端的“conf/flink-conf.yaml”配置文件中進行修改適配,默認已經是相對較優解,請謹慎修改,防止性能下降。
- “taskmanager.network.netty.num-arenas”:默認是“taskmanager.numberOfTaskSlots”,表示netty的域的數量。
- “taskmanager.network.netty.server.numThreads”和“taskmanager.network.netty.client.numThreads”:默認是“taskmanager.numberOfTaskSlots”,表示netty的客戶端和服務端的線程數目設置。
- “taskmanager.network.netty.client.connectTimeoutSec”:默認是120s,表示taskmanager的客戶端連接超時的時間。
- “taskmanager.network.netty.sendReceiveBufferSize”:默認是系統緩沖區大小(cat /proc/sys/net/ipv4/tcp _ [rw]mem) ,一般為4MB,表示netty的發送和接收的緩沖區大小。
- “taskmanager.network.netty.transport”:默認為“nio”方式,表示netty的傳輸方式,有“nio”和“epoll”兩種方式。
6.解決數據傾斜
當數據發生傾斜(某一部分數據量特別大),雖然沒有GC(Gabage Collection,垃圾回收),但是task執行時間嚴重不一致。
-
需要重新設計key,以更小粒度的key使得task大小合理化。
-
修改並行度。
-
調用rebalance操作,使數據分區均勻。
緩沖區超時設置
-
由於task在執行過程中存在數據通過網絡進行交換,數據在不同服務器之間傳遞的緩沖區超時時間可以通過setBufferTimeout進行設置。
-
當設置“setBufferTimeout(-1)”,會等待緩沖區滿之后才會刷新,使其達到最大吞吐量;
-
當設置“setBufferTimeout(0)”時,可以最小化延遲,數據一旦接收到就會刷新;
-
當設置“setBufferTimeout”大於0時,緩沖區會在該時間之后超時,然后進行緩沖區的刷新。示例可以參考如下:
-
env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(newMyMapper()).setBufferTimeout(timeoutMillis);
7.Checkpoint 調優
Flink 為了達到容錯和 exactly-once 語義的功能,定期把 state 持久化下來,而這一持久化的過程就叫做 checkpoint ,它是 Flink Job 在某一時刻全局狀態的快照。
8.Flink 作業的問題定位
問題定位口訣: 一壓二查三指標,延遲吞吐是核心。時刻關注資源量 , 排查首先看GC。
- 一壓是指背壓,遇到問題先看背壓的情況。
- 看反壓 :通常最后一個被壓高的 subTask 的下游就是 job 的瓶頸之一。
- 二查就是指 checkpoint ,對齊數據的時間是否很長,state 是否很大,這些都是和系統吞吐密切相關的。
- 看 Checkpoint 時長 :Checkpoint 時長能在一定程度影響 job 的整體吞吐。
- 三指標就是指 Flink UI 那塊的一些展示,我們的主要關注點其實就是延遲和吞吐,系統資源,還有就是 GC logs。
- 看核心指標 :指標是對一個任務性能精准判斷的依據,延遲指標和吞吐則是其中最為關鍵的指標。
- 資源的使用率:提高資源的利用率是最終的目的。
9.常見的性能問題
- 在關注背壓的時候大家往往忽略了數據的序列化和反序列化,過程所造成的性能問題。
- 一些數據結構 ,比如 HashMap 和 HashSet 這種 key 需要經過 hash 計算的數據結構,在數據量大的時候使用 keyby 進行操作, 造成的性能影響是非常大的。
- 數據傾斜 影響系統的吞吐。
- 如果我們的下游是 MySQL,HBase這種,我們都會進行一個批處理的操作,就是讓數據存儲到一個 buffer 里面,在達到某些條件的時候再進行發送,這樣做的目的就是減少和外部5. 系統的交互,降低 網絡開銷 的成本。
- 頻繁GC ,無論是 CMS 也好,G1也好,在進行 GC 的時候,都會停止整個作業的運行,GC 時間較長還會導致 JobManager 和 TaskManager 沒有辦法准時發送心跳,此時 JobManager 就會認為此 TaskManager 失聯,它就會另外開啟一個新的 TaskManager
- 窗口是一種可以把無限數據切割為有限數據塊的手段。比如我們知道,使用滑動窗口的時候數據的重疊問題,size = 5min 雖然不屬於大窗口的范疇,可是 step = 1s 代表1秒就要進行一次數據的處理,這樣就會造成數據的重疊很高,數據量很大的問題。
10.Flink 作業調優
使用布隆過濾器實現