kafka在单consumer消费顺序性和offset都没问题。(特殊情况下会出现顺序性有点误差),但是在多个consumer下消费数据需要平衡consumer对应的partition消费。平衡过程有两种方式,一种是由用户自己设置consumer进行分配(制定consumer消费对应的partition)
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
但是这种方式storm无法用,因为storm的spout是自动分配到多个JVM进行启动的,想区分进行手动分配partition比较麻烦
方法二:
consumer.subscribe(Arrays.asList(topicName.split(",")[0],topicName.split(",")[1],topicName.split(",")[2]), new ConsumerRebalanceListener(){
@Override
public void onPartitionsRevoked(
Collection<TopicPartition> partitions) {
// System.out.printf("threadId = {}, onPartitionsRevoked.", Thread.currentThread().getId());
//consumer.commitSync(offsetsMap);
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(
Collection<TopicPartition> partitions) {
// System.out.printf("threadId = {}, onPartitionsAssigned.", Thread.currentThread().getId());
//consumer.commitSync();
offsetsMap.clear();
}});
用这个方式时候一直出现一个bug,就是storm在启动的时候出现挤压数据重复读取问题。排查了两天发现是kafka在consumer平衡时候需要进行offset提交。(onPartitionsRevoked进行提交)。自此平衡之前提交,后面再加进来的consumer再分配时候就没问题了。数据保持一致性。