關於kafka多個consumer消費者問題(新版API)


kafka在單consumer消費順序性和offset都沒問題。(特殊情況下會出現順序性有點誤差),但是在多個consumer下消費數據需要平衡consumer對應的partition消費。平衡過程有兩種方式,一種是由用戶自己設置consumer進行分配(制定consumer消費對應的partition)

TopicPartition partition0 = new TopicPartition(topic, 0);

TopicPartition partition1 = new TopicPartition(topic, 1);

consumer.assign(Arrays.asList(partition0, partition1));

但是這種方式storm無法用,因為storm的spout是自動分配到多個JVM進行啟動的,想區分進行手動分配partition比較麻煩

方法二:

consumer.subscribe(Arrays.asList(topicName.split(",")[0],topicName.split(",")[1],topicName.split(",")[2]), new ConsumerRebalanceListener(){
@Override
public void onPartitionsRevoked(
Collection<TopicPartition> partitions) {
// System.out.printf("threadId = {}, onPartitionsRevoked.", Thread.currentThread().getId());

//consumer.commitSync(offsetsMap);
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(
Collection<TopicPartition> partitions) {
// System.out.printf("threadId = {}, onPartitionsAssigned.", Thread.currentThread().getId());
//consumer.commitSync();
offsetsMap.clear();
}});

用這個方式時候一直出現一個bug,就是storm在啟動的時候出現擠壓數據重復讀取問題。排查了兩天發現是kafka在consumer平衡時候需要進行offset提交。(onPartitionsRevoked進行提交)。自此平衡之前提交,后面再加進來的consumer再分配時候就沒問題了。數據保持一致性。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM