KafkaSpout 重復消費問題解決


使用https://github.com/nathanmarz/storm-contrib來對接Kafka0.7.2時, 發現kafkaSpout總會進行數據重讀, 配置都無問題, 也沒報錯

進行debug之后, 發現是由於自己寫的blot繼承於IBolt, 但自己沒有在代碼中顯示的調用collector.ack(); 導致kafkaSpout一直認為emitted的數據有問題, 超時之后進行數據重發

KafkaSpout中關鍵代碼如下:

PartitionManager.java

public void commit() {
LOG.info("Committing offset for " + _partition);
long committedTo;
if(_pending.isEmpty()) {
committedTo = _emittedToOffset;
} else {
committedTo = _pending.first();
}
if(committedTo!=_committedTo) {
LOG.info("Writing committed offset to ZK: " + committedTo);

Map<Object, Object> data = (Map<Object,Object>)ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
.put("offset", committedTo)
.put("partition", _partition.partition)
.put("broker", ImmutableMap.of("host", _partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);

LOG.info("Wrote committed offset to ZK: " + committedTo);
_committedTo = committedTo;
}
LOG.info("Committed offset " + committedTo + " for " + _partition);
}

如果Bolt不進行ack, 則紅色代碼處的offsetNumber永遠相等, 導致一直不進行offset的回寫操作

 

解決方案:

1. IBolt中顯式調用collector.ack();

2. 使用幫你封裝好的BaseBasicBlot, 它會幫你自動調用ack的

關於Ack的問題, 可以參考我的翻譯和官網文章: http://www.cnblogs.com/zhwbqd/p/3960991.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM