Flink 參數配置和常見參數調優


1、Flink參數配置

  • jobmanger.rpc.address:jobmanger的地址
  • jobmanger.rpc.port:jobmanger的端口
  • jobmanager.heap.mb:jobmanager的堆內存大小。不建議配的太大,1-2G足夠。
  • taskmanager.heap.mb:taskmanager的堆內存大小。大小視任務量而定。需要存儲任務的中間值,網絡緩存,用戶數據等。
  • taskmanager.numberOfTaskSlots:slot數量。
    • 在yarn模式使用的時候會受到yarn.scheduler.maximum-allocation-vcores值的影響。
    • 此處指定的slot數量如果超過yarn的maximum-allocation-vcores,flink啟動會報錯。
    • 在yarn模式,flink啟動的task manager個數可以參照如下計算公式:num_of_manager = ceil(parallelism / slot)  即並行度除以slot個數,結果向上取整。
  • parallelsm.default:任務默認並行度,如果任務未指定並行度,將采用此設置。
  • web.port: Flink web ui的端口號。
  • jobmanager.archive.fs.dir: 將已完成的任務歸檔存儲的目錄。
  • history.web.port: 基於web的history server的端口號。
  • historyserver.archive.fs.dir:history server的歸檔目錄。該配置必須包含jobmanager.archive.fs.dir配置的目錄,以便history server能夠讀取到已完成的任務信息
  • historyserver.archive.fs.refresh-interval: 刷新存檔作業目錄時間間隔
  • state.backend: 存儲和檢查點的后台存儲。可選值為rocksdb、filesystem、hdfs。
  • state.backend.fs.checkpointdir:檢查點數據文件和元數據的默認目錄。
  • state.checkpoints.dir:保存檢查點的目錄
  • state.savepoints.dir:save point的目錄
  • state.checkpoints.num-retained:保留最近檢查點的數量
  • state.backend.incremental:增量存儲
  • akka.ask.timeout:jobmanager和task manager通信連接的超時時間。如果網絡擁擠經常出現超時錯誤,可以增大該配置
  • akka.watch.heartbeat.interval:心跳發送間隔,用來檢測task manager的狀態
  • akka.watch.heartbeat.pause:如果超過該時間仍未收到task manager的心跳,該task manager會被認為已掛掉
  • taskmanager.network.memory.max:網絡緩存區最大內存大小
  • taskmanager.network.memory.min:網絡緩存區最小內存大小
  • taskmanager.network.memory.fraction:網絡緩沖區使用的內存占用總JVM內存的比例。如果配置了taskmanager.network.memory.max和taskmanager.network.memory.min的配置會被覆蓋
  • fs.hdfs.hadoopconf:hadoop配置文件路徑(已被廢棄,建議使用HADOOP_CONF_DIR環境變量)
  • yarn.application-attempts:job失敗嘗試次數,指jobmanager的重啟嘗試次數。該指不應該超過yarn-site.xml中的yarn.resourcemanager.am.max-attempts的值

2、Flink HA(Job Manager)的配置

  • high-availability:zookeeper

使用zookeeper負責HA實現

  • high-availability.zookeeper.path.root:/flink

flink信息在zookeeper存儲節點的名稱

  • high-availability.zookeeper.quorum:hadoop100:2181,hadoop101:2181,hadoop103:2181

zk集群節點的地址和端口

  • high-availability.storageDir: hdfs://nameservice/flink/ha/

job manager元數據在文件系統存儲的位置,zk僅保存了指向該目錄的指針

3、Flink metrics監控相關配置

  • metrics.reporters: prom
  • metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
  • metrics.reporter.prom.port: 9250-9260

4、Kafka相關調優配置

linger.ms/batch.size:這兩個配置項配合使用,可以在吞吐量和延遲中得到最佳的平衡點。

batch.size是kafka producer發送數據的批量大小,當數據量達到batch size的時候,會將這批數據發送出去,避免了數據一條一條的發送,頻繁建立和斷開網絡連接。但是如果數據量比較小,導致遲遲不能達到batch.size,為了保證延遲不會過大,kafka不能無限等待數據量達到batch.size的時候才發送。為了解決這個問題,引入了linger.ms配置項。當數據在緩存中的時間超過linger.ms時,無論緩存中數據是否達到批量大小,都會被強制發送出去

kafka topic分區數和Flink並行度的關系

Flink Kafka source的並行度需要和kafka topic的分區數一致。最大化利用kafka多分區topic的並行讀取能力

5、Yarn相關調優配置

  • yarn.scheduler.maximum-allocation-vcores
  • yarn.scheduler.minimum-allocation-vcores

Flink單個Task Manager的slot數量必須結余這兩個值之間

Flink的Job Manager和Task Manager內存不得超過container最大分配內存大小

  • yarn.nodemanager.resource.cpu-vcores

yarn的虛擬cpu內核數,設置為物理機CPU核心數的2-3倍。會導致CPU資源無法被充分利用,跑任務的時候CPU占用率不高。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM