使用https://github.com/nathanmarz/storm-contrib來對接Kafka0.7.2時, 發現kafkaSpout總會進行數據重讀, 配置都無問題, 也沒報錯
進行debug之后, 發現是由於自己寫的blot繼承於IBolt, 但自己沒有在代碼中顯示的調用collector.ack(); 導致kafkaSpout一直認為emitted的數據有問題, 超時之后進行數據重發
KafkaSpout中關鍵代碼如下:
PartitionManager.java
public void commit() { LOG.info("Committing offset for " + _partition); long committedTo; if(_pending.isEmpty()) { committedTo = _emittedToOffset; } else { committedTo = _pending.first(); } if(committedTo!=_committedTo) { LOG.info("Writing committed offset to ZK: " + committedTo); Map<Object, Object> data = (Map<Object,Object>)ImmutableMap.builder() .put("topology", ImmutableMap.of("id", _topologyInstanceId, "name", _stormConf.get(Config.TOPOLOGY_NAME))) .put("offset", committedTo) .put("partition", _partition.partition) .put("broker", ImmutableMap.of("host", _partition.host.host, "port", _partition.host.port)) .put("topic", _spoutConfig.topic).build(); _state.writeJSON(committedPath(), data); LOG.info("Wrote committed offset to ZK: " + committedTo); _committedTo = committedTo; } LOG.info("Committed offset " + committedTo + " for " + _partition); }
如果Bolt不進行ack, 則紅色代碼處的offsetNumber永遠相等, 導致一直不進行offset的回寫操作
解決方案:
1. IBolt中顯式調用collector.ack();
2. 使用幫你封裝好的BaseBasicBlot, 它會幫你自動調用ack的
關於Ack的問題, 可以參考我的翻譯和官網文章: http://www.cnblogs.com/zhwbqd/p/3960991.html