1. 什么是KafkaConsumer?
應用程序使用KafkaConsul'le 「向Kafka 訂閱主題,並從訂閱的主題上接收消息。Kafka的消息讀取不同於從其他消息系統讀取數據,它涉及了一些獨特的概念和想法。
1.1 消費者和消費者群組
單個的消費者就跟前面的消息系統的消費者一樣,創建一個消費者對象,然后訂閱一個主題並開始接受消息,然后做自己的業務邏輯,但是Kafka天生就是支持體量很大的數據消費,如果只是使用單個的消費者消費消息,當生產者寫入消息的速度遠遠大於了消費者的速度,大量消息堆積在消費者上可能會導致性能反而降低或撐爆消費者,所以橫向伸縮是很有必要的,就想多個生產者可以向相同的主題寫消息一樣,我們也可以使用多個消費者從同一個主題讀取消息,對消息進行分流,這多個消費者就從屬於一個消費者群組。一個群組里的消費者訂閱的是同一個主題,每個消費者接收主題一部分分區的消息。
假設主題T1有四個分區,我們創建了消費者群組G1,創建了一個消費者C1從屬於G1,它是G1里的唯一的消費者,此時訂閱主題情況為,C1將會接收到主題中四個分區中的消息,如圖:

此時我們在消費者群組中新增一個消費者C2,那么每個消費者將分別從兩個分區接受消息,如圖:
如果我們有四個消費者時,將會每個消費者都分到一個分區。
如果群組中的消費者超過了主題的分區數,那么有一部分消費者就會被閑置,不會接收任何消息。如圖:

往群組里增加消費者是橫向伸縮消費能力的主要方式。
對於多個群組來說,每個群組都會從Kafka中接收到所有的消息,並且各個群組之間是互不干擾的。所以橫向伸縮Kafka消費者和消費者群組並不會對性能造成負面影響。簡而言之就是,為每一個需要獲取一個或多個主題全部消息的應用程序創建一個消費者群組,然后往群組里添加消費者來伸縮讀取能力和處理能力,群組里的每個消費者只處理一部分消息。如圖:

1.2 消費者群組和分區再均衡
一個新的消費者加入群組時,它讀取的是原本由其他消費者讀取的消息。當一個消費者被關閉或發生奔潰時,它就離開群組,原本由它讀取的分區將由群組里的其他消費者來讀取。在主題發生變化時, 比如管理員添加了新的分區,會發生分區重分配。分區的所有權從一個消費者變成了里另一個消費者,這樣的行為被稱為再均衡。再均衡非常重要, 它為消費者群組帶來了高可用性和伸縮性(我們可以放心地添加或移除消費者),不過在正常情況下,我們並不希望發生這樣的行為。在再均衡期間,消費者無法讀取消息,造成整個群組一小段時間的不可用。另外,當分區被重新分配給另一個消費者時,消費者當前的讀取狀態會丟失,它有可能還需要去刷新緩存,在它重新恢復狀態之前會拖慢應用程序。
消費者通過向被指派為群組協調器的broker (不同的群組可以有不同的協調器)發送心跳來維持它們和群組的從屬關系以及它們對分區的所有權關系。只要消費者以正常的時間間隔發送心跳,就被認為是活躍的,說明它還在讀取分區里的消息。消費者會在輪詢消息(為了獲取消息)或提交偏移量時發送心跳。如果消費者停止發送心跳的時間足夠長,會話就會過期,群組協調器認為它已經死亡,就會觸發一次再均衡。如果一個消費者發生崩潰,井停止讀取消息,群組協調器會等待幾秒鍾,確認它死亡了才會觸發再均衡。在這幾秒鍾時間里,死掉的消費者不會讀取分區里的消息。在清理消費者時,消費者會通知協調器它將要離開群組,協調器會立即觸發一次再均衡,盡量降低處理停頓。
2. 創建Kafka消費者並讀取消息
在創建KafkaConsumer之前,需要將消費者想要的屬性存放到Properties中,然后再將properties傳給KafkaConsumer。
Consuer也有三個必須的屬性。bootstrap.servers,這里跟Producer一樣,另外兩個key.deserializer和value.deserializer也與Producer類似,不過一個是序列化,一個是反序列化而已。
還有一個group.id不是必須的,但是我們通常都會指定改消費者屬於哪個群組,所以也可以認為是必須的。
設置Properties的代碼片段如下:
Properties kafkaPropertie = new Properties(); //配置broker地址,配置多個容錯 kafkaPropertie.put("bootstrap.servers", "node2:9092,node1:9092,node1:9093"); //配置key-value允許使用參數化類型,反序列化 kafkaPropertie.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); kafkaPropertie.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); //指定消費者所屬的群組 kafkaPropertie.put("group.id","one");
接下來創建消費者,將Properties對象傳入到消費者,然后訂閱主題,如下:
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaPropertie); /*訂閱主題,這里使用的是最簡單的訂閱testTopic主題,這里也可以出入正則表達式,來區分想要訂閱的多個指定的主題,如: *Pattern pattern = new Pattern.compile("testTopic"); * consumer.subscribe(pattern); */ consumer.subscribe(Collections.singletonList("testTopic"));
接下來輪詢消息,如下:
//輪詢消息 while (true) { //獲取ConsumerRecords,一秒鍾輪訓一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); //消費消息,遍歷records for (ConsumerRecord<String, String> r : records) { LOGGER.error("partition:", r.partition()); LOGGER.error("topic:", r.topic()); LOGGER.error("offset:", r.offset()); System.out.println(r.key() + ":" + r.value()); } Thread.sleep(1000); }
生產者發送消息,然后查看消費者打印情況:
KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world0 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world1 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world2 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world3 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world4 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world5 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world6 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world7 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world8 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world9
只存在一個組群和一個消費者時:
當我們啟動兩個消費者,同一個組群,並在Topic上創建兩個Partition(分區),發送消息
final ProducerRecord<String, String> record = new ProducerRecord<String, String>("one",i % 2,"key3","hello world" + i);
將消息分發到0和1兩個partition
此時兩個消費者消費的消息總和等於發送的消息的總和,使用不同的群組的不同的訂閱同一個topic,每個消費者群組都能收到所有的消息。
輪詢不只是獲取數據那么簡單。在第一次調用新消費者的poll ()方法時,它會負責查找GroupCoordinator , 然后加入群組,接受分配的分區。如果發生了再均衡,整個過程也是在輪詢期間進行的。當然,心跳也是從輪詢里發送出去的。所以,我們要確保在輪詢期間所做的任何處理工作都應該盡快完成。
消費者完整代碼如下:
package com.wangx.kafka.client; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsuerDemo { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsuerDemo.class); public static void main(String[] args) throws InterruptedException { Properties kafkaPropertie = new Properties(); //配置broker地址,配置多個容錯 kafkaPropertie.put("bootstrap.servers", "node2:9092,node1:9092,node1:9093"); //配置key-value允許使用參數化類型,反序列化 kafkaPropertie.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); kafkaPropertie.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); //指定消費者所屬的群組 kafkaPropertie.put("group.id","1"); //創建KafkaConsumer,將kafkaPropertie傳入。 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaPropertie); /*訂閱主題,這里使用的是最簡單的訂閱testTopic主題,這里也可以出入正則表達式,來區分想要訂閱的多個指定的主題,如: *Pattern pattern = new Pattern.compile("testTopic"); * consumer.subscribe(pattern); */ consumer.subscribe(Collections.singletonList("one")); //輪詢消息 while (true) { //獲取ConsumerRecords,一秒鍾輪訓一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); //消費消息,遍歷records for (ConsumerRecord<String, String> r : records) { LOGGER.error("partition:", r.partition()); LOGGER.error("topic:", r.topic()); LOGGER.error("offset:", r.offset()); System.out.println(r.key() + ":" + r.value()); } Thread.sleep(1000); } } }
3. 消費者的配置
1. fetch.min.bytes: 該屬性指定了消費者從服務器獲取記錄的最小字節數。
2. fetch.max.wait.ms:我們通過 fetch.min.byte告訴Kafka ,等到有足夠的數據時才把它返回給消費者。
而 fetch.max.wait.ms則用於指定broker 的等待時間
3. max.partition.fetch.bytes:默認值是1MB,該屬性指定了服務器從每個分區里返回給消費者的最大字節數.
4. session.timeout.ms: 默認3s,該屬性指定了消費者在被認為死亡之前可以與服務器斷開連接的時
5. auto.offset.reset:該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下(因消費者長時間失效,包含偏移量的記錄已經過時井被刪除)該作何處
6. enable.auto.commit:該屬性指定了消費者是否自動提交偏移量,默認值是true。
7. partition.assignment.strategy: 分區分配給消費者群組的分配策略,有如下兩種策略:
Range:該策略會把主題的若干個連續的分區分配給消費者.
RoundRobin:該策略把主題的所有分區逐個分配給消費.
8. client.id:該屬性可以是任意字符串, broker 用它來標識從客戶端發送過來的消息,通常被用在日志、度量指標和配額里。
9. max.poll.records: 該屬性用於控制單次調用call () 方法能夠返回的記錄數量,可以幫你控制在輪詢里需要處理的數據量。
10. receive.buffer.bytes 和send.buffer.bytes: socket 在讀寫數據時用到的TCP 緩沖區也可以設置大小。如果它們被設為-1,就使用操作系統的默認值。如果生產者或消費者與broker處於不同的數據中心內,可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。
