Kafka Consumer API樣例


Kafka Consumer API樣例

1. 自動確認Offset

說明參照:http://blog.csdn.net/xianzhen376/article/details/51167333

Properties props = new Properties(); /* 定義kakfa 服務的地址,不需要將所有broker指定上 */ props.put("bootstrap.servers", "localhost:9092"); /* 制定consumer group */ props.put("group.id", "test"); /* 是否自動確認offset */ props.put("enable.auto.commit", "true"); /* 自動確認offset的時間間隔 */ props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); /* key的序列化類 */ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); /* value的序列化類 */ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); /* 定義consumer */ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 消費者訂閱的topic, 可同時訂閱多個 */ consumer.subscribe(Arrays.asList("foo", "bar")); /* 讀取數據,讀取超時時間為100ms */ while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); }

說明: 
1. bootstrap.servers 只是代表kafka的連接入口,只需要指定集群中的某一broker; 
2. 一旦consumer和kakfa集群建立連接,consumer會以心跳的方式來高速集群自己還活着,如果session.timeout.ms 內心跳未到達服務器,服務器認為心跳丟失,會做rebalence。

2. 手工控制Offset

如果consumer在獲得數據后需要加入處理,數據完畢后才確認offset,需要程序來控制offset的確認。舉個栗子: 
consumer獲得數據后,需要將數據持久化到DB中。自動確認offset的情況下,如果數據從kafka集群讀出,就確認,但是持久化過程失敗,就會導致數據丟失。我們就需要控制offset的確認。

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); /* 關閉自動確認選項 */ props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } /* 數據達到批量要求,就寫入DB,同步確認offset */ if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } }

還可以精細的控制對具體分區具體offset數據的確認:

try {
    while(running) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } /* 同步確認某個分區的特定offset */ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }

說明:確認的offset為已接受數據最大offset+1。

3. 分區訂閱

可以向特定的分區訂閱消息。但是會失去partion的負載分擔。有幾種場景可能會這么玩: 
1. 只需要獲取本機磁盤的分區數據; 
2. 程序自己或者外部程序能夠自己實現負載和錯誤處理。例如YARN/Mesos的介入,當consumer掛掉后,再啟動一個consumer。

String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
  • 1
  • 2
  • 3
  • 4

說明: 
1. 此種情況用了consumer Group,也不會做負載均衡。 
2. topic的訂閱和分區訂閱不可以在同一consumer中混用。

4. 外部存儲offset

消費者可以自定義kafka的offset存儲位置。該設計的主要目的是讓消費者將數據和offset進行原子性的存儲。這樣可以避免上面提到的重復消費問題。舉栗說明: 
訂閱特定分區。存儲所獲得的記錄時,將每條記錄的offset一起存儲。保證數據和offset的存儲是原子性的。當異步存儲被異常打斷時,凡已經存儲的數據,都有有相應的offset記錄。這種方式可以保證不會有數據丟失,也不會重復的從服務端讀取。 
如何配置實現: 
1. 去使能offset自動確認:enable.auto.commit=false; 
2. 從ConsumerRecord中獲取offset,保存下來; 
3. Consumer重啟時,調用seek(TopicPartition, long)重置在服務端的消費記錄。

如果消費分區也是自定義的,這種方式用起來會很爽。如果分區是自動分配的,當分區發生reblance的時候,就要考慮清楚了。如果因為升級等原因,分區漂移到一個不會更新offset的consumer上,那就日了狗了。 
該情況下: 
1. 原consumer需要監聽分區撤銷事件,並在撤銷時確認好offset。接口:ConsumerRebalanceListener.onPartitionsRevoked(Collection); 
2. 新consumer監聽分區分配事件,獲取當前分區消費的offset。接口:ConsumerRebalanceListener.onPartitionsAssigned(Collection); 
3. consumer監聽到 ConsumerRebalance事件,還沒有處理或者持久化的緩存數據flush掉。

5. 控制消費位置

大多數情況下,服務端的Consumer的消費位置都是由客戶端間歇性的確認。Kafka允許Consumer自己設置消費起點,達到的效果: 
1. 可以消費已經消費過的數據; 
2. 可以跳躍性的消費數據; 
看下這樣做的一些場景: 
1. 對Consumer來說,數據具備時效性,只需要獲取最近一段時間內的數據,就可以進行跳躍性的獲取數據; 
2. 上面自己存offset的場景,重啟后就需要從指定的位置開始消費。 
接口上面已經提到過了,用seek(TopicPartition, long)。、 
麻蛋,說指針不就好了,這一小節就是多余的叨叨。

6. 控制消費流Consumption Flow Control

如果一個consumer同時消費多個分區,默認情況下,這多個分區的優先級是一樣的,同時消費。Kafka提供機制,可以讓暫停某些分區的消費,先獲取其他分區的內容。場景舉栗: 
1. 流式計算,consumer同時消費兩個Topic,然后對兩個Topic的數據做Join操作。但是這兩個Topic里面的數據產生速率差距較大。Consumer就需要控制下獲取邏輯,先獲取慢的Topic,慢的讀到數據后再去讀快的。 
2. 同樣多個Topic同時消費,但是Consumer啟動是,本地已經存有了大量某些Topic數據。此時就可以優先去消費下其他的Topic。

調控的手段:讓某個分區消費先暫停,時機到了再恢復,然后接着poll。接口:pause(TopicPartition…),resume(TopicPartition…)

7. 多線程處理模型 Multi-threaded Processing

Kafka的Consumer的接口為非線程安全的。多線程共用IO,Consumer線程需要自己做好線程同步。 
如果想立即終止consumer,唯一辦法是用調用接口:wakeup(),使處理線程產生WakeupException。看磚:

public class KafkaConsumerRunner implements Runnable { /* 注意,這倆貨是類成員變量 */ private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(10000); // Handle new records } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } }

說明: 
1. KafkaConsumerRunner是runnable的,請自覺補腦多線程運行; 
2. 外部線程控制KafkaConsumerRunner線程的停止; 
3. 主要說的是多線程消費同一topic,而不是消費同一分區;

比較一下兩種模型:

Consumer單線程模型

優點:實現容易; 
優點:沒有線程之間的協作。通常比下面的那種更快; 
優點:單分區數據的順序處理; 
缺點:多個TCP連接,但是關系不大,kafka對自己的server自信滿滿; 
缺點:太多的Request可能導致server的吞吐降低一丟丟; 
缺點:consumer數量受到分區數量限制,一個consumer一個分區;

Consumer多線程模型

優點:一個consumer任意多的線程,線程數不用受到分區數限制; 
缺點:如果有保序需求,自己要加控制邏輯; 
缺點:該模型中如果手動offset,自己要加控制邏輯; 
一種可行的解決辦法:為每個分區分配獨立的存儲,獲取的數據根據數據所在分區進行hash存儲。這樣可以解決順序消費,和offset的確認問題。

后記

其實對於官網上說的,我是迷惑的: 
對比兩種線程模型時,應該是有隱藏地圖的。 
1. 單線程模型中,多分區情況下,應該說的是每個Consumer獨立去消費一個分區; 
2. 多線程模型中,單Consumer消費一個Topic。如果多個線程同時消費同一分區,也就是要公用連接了,各個線程之間要做好同步; 
3. 對於多線程模型下提出的客戶端分區數據分開存儲,各個分區之間是如何保序的? 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM