1. 當Source是Kafka的時候,如何設置Source Operator的並發度?
如果沒有指定,Source Operator的個數與集群中的TaskManager的個數相等。如果手動設置,建議使用的slot個數=Kafka Partition的個數/TaskManager的個數
。此時,Slot的個數需大於等於2.因為其中有一個Source Operator。也不建議在一個Slot中啟用多線程。
2. Barrier如果丟失了怎么辦?
因為Barrier是從Source開始周期性的發送的,所以過一段時間未被標記為阻塞的input channel會收到下一個checkpoint的barrier,這時Flink會進行比對,發現如果當前的檢查點沒有完成,但下一個checkpoint已經過來了,那么Flink會放棄當前的checkpoint,轉而使用下一個checkpoint。
3. 在Flink UI上Cancel Job,Job所有的任務都會停止嗎?
答:不是。Cancel按鈕只是把Source,Transform和Sink這些Operator停掉,對應的線程停掉
。但整個TaskManager還在
。所以,如果Job中如果有不在Operator中初始化Spring容器,那么即便Cancel Job以后,這些對象依然存在。所以,正確的姿勢是在Operator的open()方法中初始化Spring容器。在close()方法中釋放這些資源。
4. Job運行過程中TaskManager掛了怎么辦?
如果TaskManager掛了,Flink會先將Job cancel掉。然后再以相同的JobID,往集群中仍然存活的TaskManager上部署Job,這時候,如果還有足夠的task slot,則Job能夠恢復。但是這時候會有一個問題:部署在某些TaskManager上的Task數會比之前多,造成了這些TaskManager的負載較重,可能還是會出現問題。這時候就需要盡快恢復掛掉的TaskManager。
5. 某條數據在Input Channel之間傳輸失敗了怎么辦?
會拋出Exception,然后Job會重啟。
6. Flink讀取Kafka時,Checkpoint設置多久合適?
快照本身都是非常輕量級的,一般都在幾M或者幾十M。如果快照過大,比如幾百M甚至更多,就會對程序運行產生影響。官方給出的例子是幾秒鍾一次,具體可視Job情況決定。
7. Checkpoint和Savepoint有什么區別?
savepoint可以理解為是一種特殊的checkpoint,savepoint就是指向checkpoint的一個指針,需要手動觸發,而且不會過期,不會被覆蓋,除非手動刪除。正常情況下的線上環境是不需要設置savepoint的。除非對job或集群做出重大改動的時候,需要進行測試運行。
8. Flink的Operator不能帶成員變量?
Flink operator function中不能帶沒有實現flink序列化的成員變量。因為flink本身自己有一套序列化方式,在任務提交執行的時候會有validation,如果把沒有實現flink序列化的類作為成員變量,就會提交任務報錯。目前的解決方案是將operator function與實際業務邏輯分離。或將成員變為static。
9. 每個TaskManager設置多少個TaskSlot合適?
建議為CPU核數個。
10. TaskManager中的BufferPool不夠了咋辦?
需要增大配置項:taskmanager.network.numberOfBuffers
的值,該值表示網絡棧buffer的數量,它的大小表示在同一時刻該TaskManager能夠擁有的流處理的數據交換的channel數。
11. Job運行中出現了OOM
說明保留的空間不夠,這時需減少中間層的空間大小,通過配置降低taskmanager.memory.fraction
的值來減少中間層的內存占比。該值表示Flink用於管理底層buffer所占用的內存比例。
12. Job的並行度如何設置?
將所有的transform operator和sink operator的parallism設置成一樣的,source operator的parallism根據source而定。這樣的話,flink會自動把transform operator和sink operator 都merge成一個piple line去運行。那么這時候一個job就變成只有兩個operator了,source operator和merge后的operator,這個pipeline operator中間就沒有buffer了,性能最優。