1、Flink參數配置
- jobmanger.rpc.address:jobmanger的地址
- jobmanger.rpc.port:jobmanger的端口
- jobmanager.heap.mb:jobmanager的堆內存大小。不建議配的太大,1-2G足夠。
- taskmanager.heap.mb:taskmanager的堆內存大小。大小視任務量而定。需要存儲任務的中間值,網絡緩存,用戶數據等。
- taskmanager.numberOfTaskSlots:slot數量。
- 在yarn模式使用的時候會受到
yarn.scheduler.maximum-allocation-vcores
值的影響。 - 此處指定的slot數量如果超過yarn的maximum-allocation-vcores,flink啟動會報錯。
- 在yarn模式,flink啟動的task manager個數可以參照如下計算公式:
num_of_manager = ceil(parallelism / slot)
即並行度除以slot個數,結果向上取整。
- 在yarn模式使用的時候會受到
- parallelsm.default:任務默認並行度,如果任務未指定並行度,將采用此設置。
- web.port: Flink web ui的端口號。
- jobmanager.archive.fs.dir: 將已完成的任務歸檔存儲的目錄。
- history.web.port: 基於web的history server的端口號。
- historyserver.archive.fs.dir:history server的歸檔目錄。該配置必須包含jobmanager.archive.fs.dir配置的目錄,以便history server能夠讀取到已完成的任務信息
- historyserver.archive.fs.refresh-interval: 刷新存檔作業目錄時間間隔
- state.backend: 存儲和檢查點的后台存儲。可選值為rocksdb、filesystem、hdfs。
- state.backend.fs.checkpointdir:檢查點數據文件和元數據的默認目錄。
- state.checkpoints.dir:保存檢查點的目錄
- state.savepoints.dir:save point的目錄
- state.checkpoints.num-retained:保留最近檢查點的數量
- state.backend.incremental:增量存儲
- akka.ask.timeout:jobmanager和task manager通信連接的超時時間。如果網絡擁擠經常出現超時錯誤,可以增大該配置
- akka.watch.heartbeat.interval:心跳發送間隔,用來檢測task manager的狀態
- akka.watch.heartbeat.pause:如果超過該時間仍未收到task manager的心跳,該task manager會被認為已掛掉
- taskmanager.network.memory.max:網絡緩存區最大內存大小
- taskmanager.network.memory.min:網絡緩存區最小內存大小
- taskmanager.network.memory.fraction:網絡緩沖區使用的內存占用總JVM內存的比例。如果配置了taskmanager.network.memory.max和taskmanager.network.memory.min的配置會被覆蓋
- fs.hdfs.hadoopconf:hadoop配置文件路徑(已被廢棄,建議使用HADOOP_CONF_DIR環境變量)
- yarn.application-attempts:job失敗嘗試次數,指jobmanager的重啟嘗試次數。該指不應該超過
yarn-site.xml
中的yarn.resourcemanager.am.max-attempts
的值
2、Flink HA(Job Manager)的配置
- high-availability:zookeeper
使用zookeeper負責HA實現
- high-availability.zookeeper.path.root:/flink
flink信息在zookeeper存儲節點的名稱
- high-availability.zookeeper.quorum:hadoop100:2181,hadoop101:2181,hadoop103:2181
zk集群節點的地址和端口
- high-availability.storageDir: hdfs://nameservice/flink/ha/
job manager元數據在文件系統存儲的位置,zk僅保存了指向該目錄的指針
3、Flink metrics監控相關配置
- metrics.reporters: prom
- metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
- metrics.reporter.prom.port: 9250-9260
4、Kafka相關調優配置
linger.ms/batch.size:這兩個配置項配合使用,可以在吞吐量和延遲中得到最佳的平衡點。
batch.size是kafka producer發送數據的批量大小,當數據量達到batch size的時候,會將這批數據發送出去,避免了數據一條一條的發送,頻繁建立和斷開網絡連接。但是如果數據量比較小,導致遲遲不能達到batch.size,為了保證延遲不會過大,kafka不能無限等待數據量達到batch.size的時候才發送。為了解決這個問題,引入了linger.ms配置項。當數據在緩存中的時間超過linger.ms時,無論緩存中數據是否達到批量大小,都會被強制發送出去
kafka topic分區數和Flink並行度的關系
Flink Kafka source的並行度需要和kafka topic的分區數一致。最大化利用kafka多分區topic的並行讀取能力
5、Yarn相關調優配置
- yarn.scheduler.maximum-allocation-vcores
- yarn.scheduler.minimum-allocation-vcores
Flink單個Task Manager的slot數量必須結余這兩個值之間
Flink的Job Manager和Task Manager內存不得超過container最大分配內存大小
- yarn.nodemanager.resource.cpu-vcores
yarn的虛擬cpu內核數,設置為物理機CPU核心數的2-3倍。會導致CPU資源無法被充分利用,跑任務的時候CPU占用率不高。