最近公司的zk的down掉了, storm job 重啟的時候報出 kafka.common.OffsetOutOfRangeException 異常
網上查詢了一些朋友的做法, 自己也看了一下代碼, 最終還是解決了
原因:
zk掛掉的這幾天, kafka中之前的數據已經被清掉了, 但是zk中保存的offset還是幾天之前的, 導致KafkaSpout要獲取的offset超過了當前kafka的offset, 就像ArrayIndexOutOfRangeException一樣
解決方案:
KafkaSpout 配置項中可以選擇讀取的方式, 共有三種, 如果Topology啟動的時候未進行配置, 則默認是從Zk中讀取, 所以導致了異常
-2: 從最老的開始讀
-1: 從最近的開始讀
0: 從Zk中讀
相關代碼如下, storm.kafka.PartitionManager,
_partition = id;
_connections = connections;
_spoutConfig = spoutConfig;
_topologyInstanceId = topologyInstanceId;
_consumer = connections.register(id.host, id.partition);
_state = state;
_stormConf = stormConf;
String jsonTopologyId = null;
Long jsonOffset = null;
try {
Map<Object, Object> json = _state.readJSON(committedPath());
if(json != null) {
jsonTopologyId = (String)((Map<Object,Object>)json.get("topology")).get("id");
jsonOffset = (Long)json.get("offset");
}
}
catch(Throwable e) {
LOG.warn("Error reading and/or parsing at ZkNode: " + committedPath(), e);
}
if(!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
_committedTo = _consumer.getOffsetsBefore(spoutConfig.topic, id.partition, spoutConfig.startOffsetTime, 1)[0];
LOG.info("Using startOffsetTime to choose last commit offset.");
} else if(jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
_committedTo = _consumer.getOffsetsBefore(spoutConfig.topic, id.partition, -1, 1)[0];
LOG.info("Setting last commit offset to HEAD.");
} else {
_committedTo = jsonOffset;
LOG.info("Read last commit offset from zookeeper: " + _committedTo);
}
LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
_emittedToOffset = _committedTo;
}
重點關注紅色代碼, spoutConfig.forceFromStart 為true的時候, 才會真正去讀取自己設置的offset, 否則將會使用Zk中的offset
那么問題來了, 如何設置呢, SpoutConfig很貼心的給我們提供了一個方法
startOffsetTime = millis;
forceFromStart = true;
}
所以我們只需要在我們的Topology中添加如下代碼即可
if (args != null && args[1] != null && Integer.valueOf(args[1]) != 0) {
if (Integer.valueOf(args[1]) == -2) {
spoutConfig.forceStartOffsetTime(-2); // 從kafka最老的記錄讀取
} else if (Integer.valueOf(args[1]) == -1) {
spoutConfig.forceStartOffsetTime(-1); // 從kafka最新的記錄讀取
} // 其他情況則默認從zk的offset讀取
}
發布Topology的時候, 如果需要從最新記錄讀取, 則像這樣 storm jar com.abc.StormTopology stormTopology -1
其他鏈接: http://blog.csdn.net/baiyangfu_love/article/details/8919699