Kafka 2.6新功能:消費者主動觸發Rebalance


Kafka 2.6引入的新功能:消費者能夠主動觸發Rebalance。一直以來,Rebalance的觸發都是由Coordinator來執行的,但有些場景下消費者端能夠主動觸發Rebalance會很有必要。舉個例子,在ConsumerPartitionAssignor接口中有個subscriptionUserData方法可以實現自定義的用戶數據。之后在進行Rebalance時,Leader消費者可以根據這些自定義的用戶數據執行特定邏輯的分區消費分配方案的制定。目前,倘若這些用戶數據發生了變更,Kafka是不會開啟Rebalance的。如果這些自定義數據非常影響分配方案的制定,那么唯一的做法就是調用KafkaConsumer的unsubscribe方法,顯式地告知Coordinator成員要退出組。這樣做的弊端在於,unsubscribe方法會回收當前已分配的分區。

 

針對這種場景,如果消費者能用“不退出組”的方式主動地發起Rebalance去刷新或提交最新的自定義用戶數據,那么就可以解決這個場景中碰到的問題了。基於這些考量,社區於2.6引入了這個功能。主要體現在KafkaConsumer中增加了enforceRebalance方法。如下所示:

@Override
    public void enforceRebalance() {
        acquireAndEnsureOpen();
        try {
            if (coordinator == null) {
                throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group.");
            }
            coordinator.requestRejoin();
        } finally {
            release();
        }
    }  

可見,enforceRebalance方法就是調用requestRejon方法請求Rebalance的,而不像unsubscribe那樣用顯式離開組的方式觸發Rebalance。以下是unsubscribe的源碼,可以對比感受一下:

public void unsubscribe() {
        acquireAndEnsureOpen();
        try {
            fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
            if (this.coordinator != null) {
                this.coordinator.onLeavePrepare();
                this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
            }
            this.subscriptions.unsubscribe();
            log.info("Unsubscribed all topics or patterns and assigned partitions");
        } finally {
            release();
        }
    }  

可以看到,unsubscribe的邏輯要“重”得多,它讓消費者主動發起離組操作,從而被回收了所有已分配給它的分區。

 

enforceRebalance方法是一個非阻塞方法,而且它本身不會引發Rebalance。它僅僅是將標識是否需要重新加入組的布爾型變量置為true,因此若要Rebalance被觸發,還需要顯式調用poll方法。常見的使用方式如下:

consumer.enforceRebalance();
consumer.poll(Duration.ofMillis(500));

 

那么,這個功能應該在什么時候被使用呢?通常情況下,你不需要用到這個功能,因為Coordinator會幫你料理Rebalance的事情。使用這個功能有兩個條件:

  • 你要使用ConsumerPartitionAssignor接口用於執行分配方案的制定
  • 你要指定subscriptionUserData並且使用它幫助制定分配方案

如果不同時滿足這兩個條件,使用enforceRebalance方法的意義就不大了。

 

最后說一下兩種場景下調用該方法的處理邏輯。

第一種情況:Rebalance已經發生時調用enforceRebalance方法。這是很可能出現的情況,即consumer調用enforceRebalance時Rebalance已經發生。Consumer可能正在等待Coordinator發生分配方案,或者等待其他成員加入組。這時,我們面臨幾種選擇:1、讓當前Rebalance正常完成,之后不再觸發一次Rebalance;2、1、讓當前Rebalance正常完成,之后再次觸發一次Rebalance。很明顯,無論哪種都是合理的,沒有什么絕對的對錯。目前的結論是Kafka把是否再次觸發的權利交給用戶來決定。當前Rebalance正常完成后,用戶可以自行檢查接收到的分配方案是否符合期望或者是否使用上了變更過的userData。然后再決定是否重試剛剛的enforceRebalance方法。

 

第二種情況:該consumer不屬於消費者組,可能是因為已經被踢出組或者是尚未加入組。如果此時消費者組未處於Rebalance流程,那么此時消費者還沒來得及發送最及時的userData,那么下次Rebalance一定可以包含最新的userData。此時可以嘗試重試Rebalance。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM