Flink系統配置


Flink 系統配置

Flink 提供了多個配置參數,用於調整Flink的行為與性能,所有參數均在flink-config.yaml 文件中。下面我們介紹一下幾個主要配置。

 

Java and Classloading

默認情況下,Flink啟動JVM進程時,會使用系統環境變量里的PATH路徑。當然,如果要使用自定義的Java 版本,可以指定JAVA_HOME 環境變量,或是Flink配置文件里的env.java.home 參數。Flink的JVM進程在啟動時,也可以配置自定義的JVM 選項(例如 gc 參數),配置的參數為env.java.opts,或者 env.java.opts.jobmanager,以及 env.java.opts.taskmanager.

如果執行的Flink任務使用的是外部依賴(而不是系統本地依賴),則一般不會有Classloading(類加載)的問題。在執行一個Flink應用時,此Flink程序jar包中所有的classes都必須通過一個classloader載入。Flink會將每個job的classes注冊到一個獨立的user-code classloader中,以確保執行的job的依賴不會與Flink的runtime依賴、或者其他job的依賴產生沖突。User-code class loaders 在job停掉的時候,會被清除。Flink系統的class loader會載入 lib 目錄下所有的jar 包文件,而user-code classloaders 亦是源於Flink系統的classloader。

默認情況下,Flink首先在child classloader(也就是user-code classloader)中查詢user-code classes,然后在parent classloader(也就是系統classloader)查詢classes。此機制可以避免job與Flink系統的版本沖突。不過,我們也可以通過配置classloader.resolve-order 參數轉變此順序,默認為child-first,可以修改為parent-first。

需要注意的是,有些在parent classloader中的類會永遠優先於child classloader載入,這些類在參數classloader.parent-first-patterns.default中指定。我們也可以在參數 classloader.parent-first-patterns.additional中指定一組classes,用於優先載入。

 

CPU

Flink 並不會主動限制它消耗的CPU資源,它通過processing slots來控制一個worker 進程(也就是TaskManager)可以被分配的tasks數。一個TaskManger提供了特定數目的slots,它們注冊於ResourceManager,並被其所管理。在執行程序時,一個JobManager會申請一個或多個slots,用於執行任務。一個slot可以運行一個application中任意operator的一個並行任務。所以JobManager需要至少申請operator最大並行度的slots數,才能保證任務正常執行。Task在TaskManager中以線程的方式執行,並且按需使用CPU資源。

TaskManager可提供的slots數量由taskmanager.numberOfTaskSlots 參數指定,默認為1。也就是每個TaskManager會提供一個slot。一般僅會在standalone模式下需要調整此參數,在YARN、Kubernetes、Mesos中不需要調整,因為它們可以根據資源申請更多container,而多個container也可以運行在單個節點上,效果與TaskManager中的slot基本一致。

 

Main Memory and NetworkBuffers

Flink的master與worker進程有不同的內存需求。Master進程主要管理計算資源(也就是ResourceManager)以及協調applications的執行(也就是JobManager)。而worker進程的需要進行各類計算並處理數據(可能是大量數據)。

一般來說,master進程的內存需求並不是特別大。默認情況下,它使用1GB的JVM堆內存。如果一個master進程需要管理多個applications,或者有多個operators的一個application,則我們可能需要通過jobmanager.heap.size配置增加它的堆內存。

配置worker進程的內存時,考慮的方面會更多,因為其中會有不同的組件分配不同類型的內存資源。最重要的是JVM 堆內存,通過taskmanager.heap.size進行配置。堆內存由所有objects使用,包括TaskManager runtime、operators、application的functions、以及傳輸的數據。若一個應用使用的是in-memory或是filesystem 的state backend,則state數據也會存儲在JVM。需要注意的是單個task也是有可能消耗掉它所處JVM所有堆內存的。為每個TaskManager僅配置一個slot,會有更好的資源分離,並且防止不同application之間的異常干擾。如果一個application有很多依賴包,則JVM noheap內存也可以增長到很大,因為這部分noheap內存存儲了所有TaskManager classes與user-code classes。

除了JVM,還有另外兩部分內存消耗:Flink的網絡棧與RocksDB(若是使用了它作為state backend)。Flink的網絡棧基於的是Netty庫,它從native memory(也就是off-heap memory)中分配網絡緩沖區(network buffers)。Flink需要足夠數量的network buffers,用於在worker 進程之間交換records。緩沖區的數量取決於operator tasks之間的網絡連接數量。若是一個application有多個partitioning steps,則此數量的總量可能會非常大,並需要大量的內存用於網絡傳輸。

Flink的默認配置僅適用於較小的計算規模,若是集群計算規模較大,則需要做對應參數調整。如果buffers的數量配置的不合適,則job提交后會拋出以下異常:

java.io.IOException: Insufficient number of network buffers

遇到此情況后,需要為網絡棧提供更多的內存。

分配給network buffers內存大小的參數由taskmanager.network.memory.fraction 指定,表示的是JVM內存的百分比多少用於network buffers。taskmanager.memory.segment-size參數指定的是一個network buffer的大小,默認是32KB。調小此參數,也就意味了可以增加network buffers數,但是會影響網絡棧的效率。當然,也可以設置network buffer使用內存的最小值(taskmanager.network.memory.min)與最大值(taskmanager.network.memory.max),默認分別為 64MB 與 1GB。這兩個絕對值可以限制配置的相對值(也就是之前提到的內存比例)。

RocksDB是另一個需要考慮的內存消耗因素(此處仍然是worker 進程中的內存)。但是衡量RocksDB消耗多少內存一般很難有個直接的結果,因為它取決於一個application中keyed states的數量。Flink會為一個keyed operator的每個task均創建一個獨立的(也是內嵌的)RocksDB實例。在每個實例內部,每個不同的state均被存儲在一個特定的列族(或表)中。在默認配置下,每個列族需要大約200MB到240MB的堆外內存。對於RocksDB的參數調整,暫時沒有一個萬金油,需要根據業務進行多個參數調整與測試,然后觀察性能。

所以我們在配置調整TaskManager的內存時,主要調整的是JVM的堆內存、RocksDB(如果它是作為state backend)、以及用於網絡棧的內存。

 

磁盤

一個Flink worker 進程會因多種原因將數據存儲到本地文件系統,包括:

  1. 接收application JAR 文件
  2. 寫日志文件
  3. (若是配置了RocksDB為state backend)維護application state

與磁盤相關的配置項有 io.tmp.dirs,我們可以指定一個或多個文件夾(以冒號為分隔符),用於存儲數據到本地文件系統。默認情況下,數據會寫入到默認的臨時文件夾(由Java 的 java.io.tmpdir 指定)或是Linux的臨時文件夾(/tmp)。

需要注意的是:請確保臨時目錄不會被自動清除,有些Linux 發行版會自動清除臨時文件夾/tmp。請務必關閉此功能,否則會在job recovery時,由於/tmp文件夾被清理,導致丟失metadata,並最終job 失敗。

blob.storage.directory 的參數指定了blob server的本地存儲文件夾,用於exchange較大的文件(例如application JAR 文件)。env.log.dir 的參數指定的是TaskManager寫入日志的文件夾。最后,RocksDB state backend 會將application的state維持在本地文件系統中。此目錄由state.backend.rocksdb.localdir 參數指定。如果此配置文件未顯示指定,則RocksDB會使用io.tmp.dirs 參數的值。

 

Checkpointing State Backends

Flink提供幾種選項,用於設置:state backends 如何給它們的state做檢查點(checkpoint)。一般我們會在application 代碼里手動指定。當然我們也可以通過 Flink 的配置文件,提供默認選項。如果未在代碼里指定此選項,則會使用默認選項。

State backend 維護的是application的state,所以它也會直接影響到一個application的性能。我們可以通過state.backend 指定默認的state backend。進一步的,我們可以啟用異步檢查點(state.backend.async)以及增量檢查點(state.backend.incremental)。一些backend 並不支持所有的這些選項,所以可能會直接忽視這些配置。我們也可以配置用於檢查點(checkpoint)與保存點(savepoint)寫入的遠端存儲根目錄。配置參數分別為 state.checkpoint.dir 以及 state.savepoints.dir

一些檢查點的選項是某些state backend 特有的。對於RocksDB state backend來說,我們可以定義一個或多個RocksSB存儲它本地文件的路徑(state.backend.rocksdb.localdir),以及timer state是存儲在堆內存(默認是存堆內存)還是RocksDB(配置的參數項為state.backend.rocksdb.timer-service.factory)。

最后,我們可以為一個Flink集群默認啟用並配置本地recovery,參數為 state.backend.local-recovery(設置為true即可)。Local state的副本存儲的位置也可以配置,參數為 taskmanager.state.local.root-dirs

 

安全

在Flink中,需要對無授權的訪問以及數據訪問進行安全地限制。對此,Flink支持Kerberos認證,並且可以配置使用SSL對所有網絡通信加密。

Flink可以支持Kerberos接入Hadoop生態(YARN,HDFS,HBase)、以及ZooKeeper與Kafka。Flink支持兩種認證模式:Keytabs與Hadoop delegation tokens。Keytabs是較好的選擇,因為tokens會在一段時間后失效,對長時間流處理應用產生影響。需要注意的是,認證密鑰是綁定於Flink集群,而不是一個單獨的任務。所有運行在同一個集群上的applications使用同樣的認證token。如果需要換另外的credentials,則可以啟動一個新集群。配置方法可以參考Flink官方文檔。

Flink支持兩個通信端的雙向認證,並且可以通過SSL對內部或外部網絡通信進行加密。對於內部的通信(RPC調用,數據傳輸,以及blob 服務通信(用於分發libraries等文件),所有的Flink 進程(Dispathcer、ResourceManager、JobManager以及TaskManager)均可以執行雙向認證 – 也就是發送端與服務端可以通過SSL證書進行驗證。此證書作為一個共享密鑰,可以內嵌到containers或附加到YARN啟動項里。

對於所有外部與Flink服務的通信(提交任務、控制任務、以及訪問REST 接口等),我們也可以啟用SSL為這些通信進行加密。雙向認證也可以被啟用。不過,較為推薦的方法是:配置一個代理服務,用於控制對REST終端節點的訪問。因為代理服務可以提供更多的認證與配置選項(相對於Flink來說)。

默認情況下,SSL認證與加密未啟用,因為需要額外的手動配置項(如證書、設置KeyStores與TrustStores等)。若有需要,可以參考Flink官方文檔進行配置。

 

References

Vasiliki Kalavri, Fabian Hueske. Stream Processing With Apache Flink. 2019

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM