一、消費者和消費者群組
在 Kafka 中,消費者通常是消費者群組的一部分,多個消費者群組共同讀取同一個主題時,彼此之間互不影響。Kafka 之所以要引入消費者群組這個概念是因為 Kafka 消費者經常會做一些高延遲的操作,比如把數據寫到數據庫或 HDFS ,或者進行耗時的計算,在這些情況下,單個消費者無法跟上數據生成的速度。此時可以增加更多的消費者,讓它們分擔負載,分別處理部分分區的消息,這就是 Kafka 實現橫向伸縮的主要手段。
注意的是:同一個分區只能被同一個消費者群組里面的一個消費者讀取,不可能存在同一個分區被同一個消費者群里多個消費者共同讀取的情況,如圖:
可以看到即便消費者 Consumer5 空閑,但是也不會去讀取任何一個分區的數據,同時也提醒在使用時應該合理設置消費者的數量,以免造成閑置和額外開銷。
分區再均衡
群組里的消費者共同讀取主題的分區,所以當一個消費者被關閉或發生崩潰時,它就離開了群組,原本由它讀取的分區將由群組里的其他消費者來讀取。同時在主題發生變化時 , 比如添加了新的分區,也會發生分區與消費者的重新分配,分區的所有權從一個消費者轉移到另一個消費者,這樣的行為被稱為再均衡。正是因為再均衡,所以消費費者群組才能保證高可用性和伸縮性。
消費者通過向群組協調器所在的 broker 發送心跳來維持它們和群組的從屬關系以及它們對分區的所有權。只要消費者以正常的時間間隔發送心跳,就被認為是活躍的,說明它還在讀取分區里的消息。消費者會在輪詢消息或提交偏移量時發送心跳。如果消費者停止發送心跳的時間足夠長,會話就會過期,群組協調器認為它已經死亡,就會觸發再均衡。
創建Kafka消費者
在創建消費者的時候以下以下三個選項是必選的:
- bootstrap.servers :指定 broker 的地址清單,清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找 broker 的信息。不過建議至少要提供兩個 broker 的信息作為容錯;
- key.deserializer :指定鍵的反序列化器;
- value.deserializer :指定值的反序列化器。
除此之外還需要指明需要想訂閱的主題,可以使用如下兩個 API :
- consumer.subscribe(Collection<String> topics) :指明需要訂閱的主題的集合;
- consumer.subscribe(Pattern pattern) :使用正則來匹配需要訂閱的集合。
最后需要通過輪詢 API(poll
) 向服務器定時請求數據。一旦消費者訂閱了主題,輪詢就會處理所有的細節,包括群組協調、分區再均衡、發送心跳和獲取數據,使得開發者只需要關注從分區返回的數據,然后進行業務處理。 示例如下:
public class SimpleConsumer { public static void main(String[] args) { String topic = "my-topic"; String group = "group"; Properties prop = new Properties(); prop.put("bootstrap.servers", "192.168.21.120:9092"); prop.put("group.id", group); prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(prop); kafkaConsumer.subscribe(Collections.singletonList(topic)); try { while (true) { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.of(100, ChronoUnit.MILLIS)); records.forEach(record -> { System.out.printf("topic=%s,partition=%d,key=%s,value=%s,offset=%s\n", record.topic(), record.partition(), record.key(), record.value(), record.offset()); }); } } finally { kafkaConsumer.close(); } } }
偏移量的重要性
Kafka 每一條消息都有一個偏移量屬性,記錄其在分區中的位置,偏移量是一個單調遞增的整數。消費者通過往一個叫作 _consumer_offset
的特殊主題發送消息,消息里包含每個分區的偏移量。 如果消費者一直處於運行狀態,那么偏移量就沒有 什么用處。不過,如果有消費者退出或者新分區加入,此時就會觸發再均衡。完成再均衡之后,每個消費者可能分配到新的分區,而不是之前處理的那個。為了能夠繼續之前的工作,消費者需要讀取每個分區最后一次提交的偏移量,然后從偏移量指定的地方繼續處理。 因為這個原因,所以如果不能正確提交偏移量,就可能會導致數據丟失或者重復出現消費,比如下面情況:
- 如果提交的偏移量小於客戶端處理的最后一個消息的偏移量 ,那么處於兩個偏移量之間的消息就會被重復消費;
- 如果提交的偏移量大於客戶端處理的最后一個消息的偏移量,那么處於兩個偏移量之間的消息將會丟失。
自動提交偏移量
Kafka 支持自動提交和手動提交偏移量兩種方式。
只需要將消費者的 enable.auto.commit
屬性配置為 true
即可完成自動提交的配置。 此時每隔固定的時間,消費者就會把 poll()
方法接收到的最大偏移量進行提交,提交間隔由 auto.commit.interval.ms
屬性進行配置,默認值是 5s。
使用自動提交是存在隱患的,假設使用默認的 5s 提交時間間隔,在最近一次提交之后的 3s 發生了再均衡,再均衡之后,消費者從最后一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落后了 3s ,所以在這 3s 內到達的消息會被重復處理。可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重復消息的時間窗,不過這種情況是無法完全避免的。基於這個原因,Kafka 也提供了手動提交偏移量的 API,使得用戶可以更為靈活的提交偏移量。
手動提交偏移量
用戶可以通過將 enable.auto.commit
設為 false
,然后手動提交偏移量。基於用戶需求手動提交偏移量可以分為兩大類:
- 手動提交當前偏移量:即手動提交當前輪詢的最大偏移量;
- 手動提交固定偏移量:即按照業務需求,提交某一個固定的偏移量。
而按照 Kafka API,手動提交偏移量又可以分為同步提交和異步提交
同步提交
通過調用 consumer.commitSync()
來進行同步提交,不傳遞任何參數時提交的是當前輪詢的最大偏移量。
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } /*同步提交*/ consumer.commitSync(); }
如果某個提交失敗,同步提交還會進行重試,這可以保證數據能夠最大限度提交成功,但是同時也會降低程序的吞吐量。基於這個原因,Kafka 還提供了異步提交的 API。
異步提交
異步提交可以提高程序的吞吐量,因為此時可以盡管請求數據,而不用等待 Broker 的響應。代碼如下:
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } /*異步提交並定義回調*/ consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.out.println("錯誤處理"); offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset())); } } }); }
異步提交存在的問題是,在提交失敗的時候不會進行自動重試,實際上也不能進行自動重試。假設程序同時提交了 200 和 300 的偏移量,此時 200 的偏移量失敗的,但是緊隨其后的 300 的偏移量成功了,此時如果重試就會存在 200 覆蓋 300 偏移量的可能。同步提交就不存在這個問題,因為在同步提交的情況下,300 的提交請求必須等待服務器返回 200 提交請求的成功反饋后才會發出。基於這個原因,某些情況下,需要同時組合同步和異步兩種提交方式。
同步加異步提交
下面這種情況,在正常的輪詢中使用異步提交來保證吞吐量,但是因為在最后即將要關閉消費者了,所以此時需要用同步提交來保證最大限度的提交成功。
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } // 異步提交 consumer.commitAsync(); } } catch (Exception e) { e.printStackTrace(); } finally { try { // 因為即將要關閉消費者,所以要用同步提交保證提交成功 consumer.commitSync(); } finally { consumer.close(); } }
提交特定偏移量
在上面同步和異步提交的 API 中,實際上都沒有對 commit 方法傳遞參數,此時默認提交的是當前輪詢的最大偏移量,如果你需要提交特定的偏移量,可以調用它們的重載方法。
/*同步提交特定偏移量*/ commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) /*異步提交特定偏移量*/ commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
注意的是,因為可以訂閱多個主題,所以 offsets
中必須要包含所有主題的每個分區的偏移量,示例代碼如下
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); /*記錄每個主題的每個分區的偏移量*/ TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset()+1, "no metaData"); /*TopicPartition 重寫過 hashCode 和 equals 方法,所以能夠保證同一主題和分區的實例不會被重復添加*/ offsets.put(topicPartition, offsetAndMetadata); } /*提交特定偏移量*/ consumer.commitAsync(offsets, null); } } finally { consumer.close(); }
因為分區再均衡會導致分區與消費者的重新划分,有時候可能希望在再均衡前執行一些操作:比如提交已經處理但是尚未提交的偏移量,關閉數據庫連接等。此時可以在訂閱主題時候,調用 subscribe
的重載方法傳入自定義的分區再均衡監聽器。
/*訂閱指定集合內的所有主題*/ subscribe(Collection<String> topics, ConsumerRebalanceListener listener) /*使用正則匹配需要訂閱的主題*/ subscribe(Pattern pattern, ConsumerRebalanceListener listener)
代碼示例
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() { /*該方法會在消費者停止讀取消息之后,再均衡開始之前就調用*/ @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("再均衡即將觸發"); // 提交已經處理的偏移量 consumer.commitSync(offsets); } /*該方法會在重新分配分區之后,消費者開始讀取消息之前被調用*/ @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } }); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1, "no metaData"); /*TopicPartition 重寫過 hashCode 和 equals 方法,所以能夠保證同一主題和分區的實例不會被重復添加*/ offsets.put(topicPartition, offsetAndMetadata); } consumer.commitAsync(offsets, null); } } finally { consumer.close(); }
Kafka 提供consumer.wakeup()
方法用於退出輪詢,通過拋出 WakeupException
異常來跳出循環。注意的是,在退出線程時最好顯示的調用 consumer.close()
, 此時消費者會提交任何還沒有提交的東西,並向群組協調器發送消息,告知自己要離開群組,接下來就會觸發再均衡 ,而不需要等待會話超時。
監聽控制台輸出,當輸入 exit
時結束輪詢,關閉消費者並退出程序:
/*調用 wakeup 優雅的退出*/ final Thread mainThread = Thread.currentThread(); new Thread(() -> { Scanner sc = new Scanner(System.in); while (sc.hasNext()) { if ("exit".equals(sc.next())) { consumer.wakeup(); try { /*等待主線程完成提交偏移量、關閉消費者等操作*/ mainThread.join(); break; } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> rd : records) { System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", rd.topic(), rd.partition(), rd.key(), rd.value(), rd.offset()); } } } catch (WakeupException e) { //對於 wakeup() 調用引起的 WakeupException 異常可以不必處理 } finally { consumer.close(); System.out.println("consumer 關閉"); }
獨立的消費者
Kafka 的設計目標是高吞吐和低延遲,在 Kafka 中,消費者通常都是從屬於某個群組的,因為單個消費者的處理能力是有限的。但是某些時候需求可能很簡單,比如可能只需要一個消費者從一個主題的所有分區或者某個特定的分區讀取數據,這個時候就不需要消費者群組和再均衡了, 只需要把主題或者分區分配給消費者,然后開始讀取消息井提交偏移量即可。
在這種情況下,就不需要訂閱主題, 取而代之的是消費者為自己分配分區。 一個消費者可以訂閱主題(井加入消費者群組),或者為自己分配分區,但不能同時做這兩件事情。 分配分區的示例代碼如下:
List<TopicPartition> partitions = new ArrayList<>(); List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); /*可以指定讀取哪些分區 如這里假設只讀取主題的 0 分區*/ for (PartitionInfo partition : partitionInfos) { if (partition.partition()==0){ partitions.add(new TopicPartition(partition.topic(), partition.partition())); } } // 為消費者指定分區 consumer.assign(partitions); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<Integer, String> record : records) { System.out.printf("partition = %s, key = %d, value = %s\n", record.partition(), record.key(), record.value()); } consumer.commitSync(); }
fetch.min.byte
消費者從服務器獲取記錄的最小字節數。如果可用的數據量小於設置值,broker 會等待有足夠的可用數據時才會把它返回給消費者。
fetch.max.wait.ms
broker 返回給消費者數據的等待時間,默認是 500ms。
max.partition.fetch.bytes
該屬性指定了服務器從每個分區返回給消費者的最大字節數,默認為 1MB。
session.timeout.ms
消費者在被認為死亡之前可以與服務器斷開連接的時間,默認是 3s。
auto.offset.reset
該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該作何處理:
- latest (默認值) :在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的最新記錄);
- earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄。
enable.auto.commit
是否自動提交偏移量,默認值是 true。為了避免出現重復消費和數據丟失,可以把它設置為 false。
client.id
客戶端 id,服務器用來識別消息的來源。
max.poll.records
單次調用 poll()
方法能夠返回的記錄數量。
receive.buffer.bytes & send.buffer.byte
這兩個參數分別指定 TCP socket 接收和發送數據包緩沖區的大小,-1 代表使用操作系統的默認值。