Kafka-消費者(訂閱主題消費數據及常用調優參數解析)


Kafka-消費者基礎使用及常用參數解析

消費者和消費者群組

kafka消費者從屬於消費者群組。一個群組里的消費者訂閱的是同一個主題,每個消費者接收主題一部分分區的消息。

假設主題T14個分區,我們創建了消費者C1,它是群組G1里唯一的消費者,我們用它訂閱主題T1。消費者C1將收到主題T1全部的4個分區的消息,如下

 

 

 

如果在群組G1里新增一個消費者C2,那么每個消費者將分別從兩個分區接收消息。如下

 

 

 

如果群組G14個消費者,那么每個消費者可以分配到一個分區

 

 

 

如果我們往群組里添加更多的消費者,超過主題的分區數量,那么有一部分消費者就會被閑置,不會接收到任何消息,如下圖

 

 

往群組里增加消費者是橫向伸縮消費能力的主要方式。kafka消費者經常會做一些高延遲的操作,比如把數據寫到數據庫或HDFS,或者使用數據進行比較耗時的計算。在這些情況下,單個消費者無法跟上數據生成的速度,可以增加更多的消費者,讓它們分擔負載,每個消費者只處理部分分區的消息,這就是橫向伸縮的主要手段。我們有必要為主題創建大量的分區,在負載增長時可以加入更多的消費者。但是要注意,不能讓消費者的數量超過主題分區的數量,多余的消費者會被閑置。

 

多個應用程序從同一個主題讀取數據,kafka設計的主要目標之一,就是要讓kafka主題里的數據能夠滿足企業各種應用場景的需求。在這些場景里,每個應用程序可以獲取到所有的消息,而不只是其中的一部分,只要保證每個應用程序有自己的消費者群組,就可以讓它們獲取到主題的所有消息。

橫向伸縮kafka消費者和消費者群組並不會對性能造成負面影響。

 

 

 

 

消費者群組和分區再均衡

群組里的消費者共同讀取主題的分區。一個新的消費者加入群組時,它讀取的是原本由其他消費者讀取的消息。當一個消費者被關閉或發生崩潰時,它就離開群組,原本由它讀取的分區將由群組里的其它消費者來讀取。在主題發生變化時,比如添加了新的分區,會發生分區重分配。

分區的所有權從一個消費者轉移到另一個消費者,這樣的行為被稱為再均衡。再均衡非常重要,它為消費者群組帶來了高可用性和伸縮性(我們可以放心的添加或移除消費者)。

再均衡期間,消費者無法讀取消息,造成整個群組一小段時間的不可用,所以我們不希望發生這樣的行為。當分區被重新分配給另一個消費者時,消費者當前的讀取狀態會丟失,它有可能還需要去刷新緩存,在它重新恢復狀態之前會拖慢應用程序。

消費者通過向被指派為群組協調器的broker(不同的群組可以有不同的協調器)發送心跳來維持它們和群組的從屬關系以及它們對分區的所有權關系。只要消費者以正常的時間間隔發送心跳,就被認為是活躍的,說明它還在讀取分區里的消息。消費者會在輪訓消息或提交偏移量時發送心跳。如果消費者停止發送心跳的時間足夠長,會話就會過期,群組協調器認為它已經死亡,就會觸發一次再均衡。

如果一個消費者發生崩潰,並停止讀取消息,群組協調器會等待幾秒鍾,確認它死亡了才會觸發再均衡。在這幾秒鍾時間里,死掉的消費者不會讀取分區里的消息。在清理消費者時,消費者會通知協調器它將要離開群組,協調器會立即觸發一次再均衡,盡量降低處理停頓。

分配分區的過程:

當消費者要加入群組時,它會向群組協調器發送一個joinGroup請求。第一個加入群組的消費者將成為群主。群主從協調器那里獲得群組的成員列表(列表中包含了所有最近發送過心跳的消費者,它們被認為是活躍的),並負責給每一個消費者分配分區。它使用一個實現了PartitionAssignor接口的類來決定哪些分區應該被分配給哪個消費者。群主把分配情況列表發送給群組協調器,協調器再把這些信息發送給所有消費者。每個消費者只能看到自己的分配信息,只有群主知道群組里所有消費者的分配信息。這個過程會在每次再均衡時重復發生。

 

kafka消費者使用

 

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

/**
 * @Author FengZhen
 * @Date 2020-04-06 11:07
 * @Description kafka消費者
 */
public class KafkaConsumerTest {
    private static Properties kafkaProps = new Properties();
    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("group.id", "test");
        kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public static void main(String[] args) {

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps);
        //訂閱主題,可傳入一個主題列表,也可以是正則表達式,如果有人創建了與正則表達式匹配的新主題,會立即觸發一次再均衡,消費者就可以讀取新添加的主題。
        //如:test.*,訂閱test相關的所有主題
        consumer.subscribe(Collections.singleton("test_partition"));
        System.out.println("==== subscribe success ====");
        try {
            while (true){
                //消費者持續對kafka進行輪訓,否則會被認為已經死亡,它的分區會被移交給群組里的其他消費者。
                //傳給poll方法的是一個超時時間,用於控制poll()方法的阻塞時間(在消費者的緩沖區里沒有可用數據時會發生阻塞)
                //如果該參數被設為0,poll會立即返回,否則它會在指定的毫秒數內一直等待broker返回數據
                //poll方法返回一個記錄列表。每條記錄包含了記錄所屬主題的信息、記錄所在分區的信息、記錄在分區里的偏移量,以及記錄的鍵值對。
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                System.out.println("==== data get ====");
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                }
            }
        } catch(Exception e){
            e.printStackTrace();
        } finally {
            //退出應用前使用close方法關閉消費者。
            //網絡連接和socket也會隨之關閉,並立即觸發一次再均衡,而不是等待群組協調器發現它不在發送心跳並認定它已死亡,因為那樣需要更長的時間,導致政哥群組在一段時間內無法讀取消息。
            consumer.close();
        }

    }
}

 

 

輪詢不只是獲取數據那么簡單。在第一次調用新消費者的poll方法時,它會負責查找GroupCoordinator,然后加入群組,接收分配的分區。如果發生了再均衡,整個過程也是在輪詢期間進行的。心跳也是從輪詢里發出的。所以,需要確保在輪詢期間所做的任何處理工作盡可能快的完成。

在同一個群組里,我們無法讓一個線程運行多個消費者,也無法讓多個線程安全地共享一個消費者。按照規則,一個消費者使用一個線程。如果要在同一個消費者群組里運行多個消費者,需要讓每個消費者運行在自己的線程里。

  

消費者的常用配置

1.fetch.min.bytes

該屬性指定了消費者從服務器獲取記錄的最小字節數。broker在收到消費者的數據請求時,如果可用的數據量小於fetch.min.bytes指定的大小,那么它會等到有足夠的可用數據時才把它返回給消費者。這樣可以降低消費者和broker的工作負載,因為它們在主題不是很活躍的時候就不需要來來回回地處理消息。如果沒有很多可用的數據,但消費者的CPU使用率卻很高,可以將此屬性值設置的比默認值大。如果消費者的數量較多,把該屬性值的值設置的大一點可以降低broker的工作負載。

2.fetch.max.wait.ms

該屬性指定broker返回消息的等待時間,默認是500ms。如果沒有足夠的數據流入kafka,消費者獲取最小數據量的要求就得不到滿足,最終導致500ms的延遲。如果要降低潛在的延遲(為了滿足SLA),可以把該參數值設置的小一些。如果fetch.max.wait.ms被設為100ms,並且fetch.min.bytes被設為1MBkafka在收到消費者的請求后,要么返回1MB的數據,要么在100ms后返回可用的數據,只要有一個條件滿足了,就會立馬返回。

3.max.partition.fetch.bytes

該屬性指定了服務器從每個分區里返回給消費者的最大字節數。它的默認值是1MBKafkaConsumer.poll()方法從每個分區里返回的記錄最多不超過max.partition.fetch.bytes指定的字節。如果一個主題有20個分區和5個消費者,那么每個消費者需要至少4MB的可用內存來接收記錄。在為消費者分配內存時,可以給它們多分配一些,因為如果群組里有消費者發生崩潰,剩下的消費者需要處理更多的分區。

max.partition.fetch.bytes的值必須比broker能夠接收的最大消息的字節數(max.message.size)大,否則消費者可能無法讀取這些消息,導致消費者一直掛起重試。

在設置此值時,還需要考慮消費者處理數據的時間。消費者需要頻繁的調用poll()方法來避免會話過期和發生分區的再均衡,如果單次調用poll()返回的數據太多,消費者需要更多的時間來處理,可能無法及時進行下一個輪詢來避免會話過期。出現這種情況,可以把max.partition.fetch.bytes改小,或者延長會話過期時間。

4.session.timeout.ms

該屬性值指定了消費者在被認為死亡之前可以與服務器斷開連接的時間,默認是3s。如果消費者沒有在session.timeout.ms指定的時間內發送心跳給群組協調器,就被認為已經死亡,協調器就會觸發再均衡,把它的分區分配給群組里的其它消費者。heartbeat.interval.ms指定了poll()方法向協調器發送心跳的頻率,session.timeout.ms則指定了消費者可以多久不發送心跳。所以,一般需要同時修改這兩個屬性,heartbeat.interval.ms必須比session.timeout.ms小,一般是session.timeout.ms的三分之一。

session.timeout.ms調小:可以更快地檢測和恢復崩潰的節點,不過長時間的輪詢或垃圾收集可能導致非預期的再均衡。

session.timeout.ms調大:可以減少意外的再均衡,不過檢測節點崩潰需要更長的時間。

5.auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下(因消費者長時間失效,包含偏移量的記錄已經過時並被刪除)該作何處理。它的默認值是latest,偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄)。另一個值是earliest,偏移量無效的情況下,消費者將從起始位置讀取分區的記錄。

6.enable.auto.commit

該屬性指定了消費者是否自動提交偏移量,默認值是true。為了盡量避免出現重復數據和數據丟失,可以把它設為false,由自己控制何時提交偏移量。如果把它設為true,還可以通過配置auto.commit.interval.ms屬性來控制提交的頻率。

7.partition.assignment.strategy

分區會被分配給群組里的消費者。PartitionAssignor根據給定的消費者和主題,決定哪些分區應該被分配給哪個消費者。kafka有兩個默認的分配策略

Range(默認):該策略會把主題的若干個連續的分區分配給消費者。假設消費者C1C2同時訂閱了主題T1和主題T2,並且每個主題有3個分區。那么消費者C1有可能分配到這兩個主題的分區0和分區1,四個分區;而消費者C2分配到這兩個主題的分區2,兩個分區。因為每個主題擁有奇數個分區,而分配是在主題內獨立完成的,第一個消費者最后分配到比第二個消費者更多的分區。只要使用了Range策略,而且分區數量無法被消費者數量整除,就會出現這種情況。

org.apache.kafka.clients.consumer.RangeAssignor

RoundRobin:該策略把主題的所有分區逐個分配給消費者。如果使用RoundRobin策略來給消費者C1和消費者C2分配分區,那么消費者C1將分到主題T1的分區0和分區2以及T2主題的分區1;消費者C2將分配到主題T1的分區1以及主體T2的分區0和分區2.一般來說,如果所有消費者都訂閱相同的主題,RoundRobin策略會給所有消費者分配相同數量的分區(最多差一個分區)

org.apache.kafka.clients.consumer.RoundRobinAssignor

8.client.id

該屬性可以是任意字符串,broker用它來標記從客戶端發送過來的消息,通常被用在日志、度量指標和配額里。

9.max.poll.records

該屬性用於控制單次調用poll()方法能夠返回的記錄數量,可以控制在輪詢里需要處理的數據量。

10.receive.buffer.bytessend.buffer.bytes

socket在讀寫數據時用到的TCP緩沖區也可以設置大小。如果它們被設為-1,就使用操作系統的默認值。如果生產者或消費者與broker處於不同的數據中心內,可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM