最近在做一個監控系統,用來監控網站上各個業務功能的調用量以及處理時間,以便及時發現問題,及時處理。做這種實時統計處理系統,自然首先想到了storm,於是現學現用,自然遇到了一些坑,而且不少是網上也難以找到的問題。在這里就做個記錄,記錄下這個最讓我苦惱的錯誤。
首先我的業務邏輯是按分鍾統計一分鍾中的調用次數的數據,所以我在bolt里跑了一個定時器,定時將統計數據發到下一個bolt入庫。所在我在定時器執行的代碼里調用了OutputCollector發射到下一個bolt。本地調試沒啥問題,就部署到外網環境測試。通常也沒發現問題,但是偶爾會出現這種錯誤,作為開發人員最討厭的就是這種可復現率很低的錯誤 。
這里是錯誤日志:
5675 [Thread-7-disruptor-executor[2 2]-send-queue] ERROR backtype.storm.daemon.executor -
java.lang.RuntimeException: java.lang.NullPointerException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:722) [na:1.7.0_15]
Caused by: java.lang.NullPointerException: null
at clojure.lang.RT.intCast(RT.java:1087) ~[clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3]
... 6 common frames omitted
5697 [Thread-7-disruptor-executor[2 2]-send-queue] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:722) [na:1.7.0_15]
如果你也遇到這個問題,相信你第一次看到這個錯誤一定很痛苦,因為錯誤日志中沒有任何與自己的業務代碼相關的記錄。所以實在是無從定位問題的所在。痛苦至極的是復現還不那么容易。
經過我多次猜測嘗試,終於測出了問題的所在。下面我先貼出一個會報這個錯誤的例子代碼:
public class Main {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",new TestWordSpout());
builder.setBolt("dispatch", new WordDispatchBolt()).shuffleGrouping("spout");
builder.setBolt("print",new PrintBolt()).fieldsGrouping("dispatch", new Fields("word"));
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(1);
//conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test-kafka-1", conf, builder.createTopology());
}
}
public class TestWordSpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
boolean _isDistributed;
SpoutOutputCollector _collector;
String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
public TestWordSpout() {
this(true);
}
public TestWordSpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
public void close() {
}
public void nextTuple() {
Utils.sleep(1000);
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word), word+new Random().nextDouble());
}
public void ack(Object msgId) {
System.out.println("### ack:"+msgId);
}
public void fail(Object msgId) {
System.out.println("### fail:"+msgId);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
public class WordDispatchBolt extends BaseRichBolt{
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
new Thread(new Runnable() {
@Override
public void run() {
while(true){
send();//不做sleep休眠,否則拋出此異常的幾率太小,不容易觀察到
}
}
}).start();
}
public void send(){
this.collector.emit(new Values(new Random().nextDouble()));
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
this.collector.emit(new Values(word));
this.collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public class PrintBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
System.out.println(input.getValue(0));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
這個代碼很簡單,就不做詳細介紹了。在WordDispatchBolt類里我啟動了另一個線程來發射數據到下一個bolt。我的業務代碼中與此類似,是通過Timer定時發送數據的(Timer底層其實也是線程,就不多說了)。但是Timer是按分鍾調用的,所以出現問題的幾率小的可憐,這里我故意零停頓的調用,讓此異常發生的幾率更大一些。
如果運行以上例子代碼,你也肯定遇到前邊貼出的錯誤異常。如果不知道是OutputCollector的同步問題,相信解決起來絕對讓人痛不欲生。既然知道了是同步問題,要么避免在別的線程里調用collector,要么改成同步的。以下是我簡單想到的解決方案。(如果有大神還有更好的,希望留言指教)
對WordDispatchBolt類做如下修改:
public class WordDispatchBolt extends BaseRichBolt{
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
new Thread(new Runnable() {
@Override
public void run() {
while(true){
send(new Values(new Random().nextDouble()));//不做sleep休眠,否則拋出此異常的幾率太小,不容易觀察到
}
}
}).start();
}
public synchronized void send(List<Object> tuple){
this.collector.emit(tuple);
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
send(new Values(word));
this.collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
到這里,這個坑算是基本得到解決了。之后可能還要大量使用到storm,遇到坑是再做記錄。
”把遇到的坑記錄下來,讓后遇到者可以有更多的網絡資源查詢,以減少排查問題的時間和糾結“
