kafka在單consumer消費順序性和offset都沒問題。(特殊情況下會出現順序性有點誤差),但是在多個consumer下消費數據需要平衡consumer對應的partition消費。平衡過程有兩種方式,一種是由用戶自己設置consumer進行分配(制定consumer消費對應的partition)
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
但是這種方式storm無法用,因為storm的spout是自動分配到多個JVM進行啟動的,想區分進行手動分配partition比較麻煩
方法二:
consumer.subscribe(Arrays.asList(topicName.split(",")[0],topicName.split(",")[1],topicName.split(",")[2]), new ConsumerRebalanceListener(){
@Override
public void onPartitionsRevoked(
Collection<TopicPartition> partitions) {
// System.out.printf("threadId = {}, onPartitionsRevoked.", Thread.currentThread().getId());
//consumer.commitSync(offsetsMap);
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(
Collection<TopicPartition> partitions) {
// System.out.printf("threadId = {}, onPartitionsAssigned.", Thread.currentThread().getId());
//consumer.commitSync();
offsetsMap.clear();
}});
用這個方式時候一直出現一個bug,就是storm在啟動的時候出現擠壓數據重復讀取問題。排查了兩天發現是kafka在consumer平衡時候需要進行offset提交。(onPartitionsRevoked進行提交)。自此平衡之前提交,后面再加進來的consumer再分配時候就沒問題了。數據保持一致性。