storm坑之---傳遞對象


  繼之前遇到的那個同步問題的坑之后(storm坑之---同步問題),最近對代碼又做了調整和重構,並且又遇到了另一個storm開發中應該值得警惕的坑。接下來說說這個坑的大體情況。

  在我的storm程序中,Abolt需要將數據封裝成一個對象同時發送給Bbolt和Cbolt各一份,Bbolt和Cbolt分別對對象做一定的處理后,更新到數據庫。在查看日志時,意外的發現有些數據是不正確的詭異的,我先是懷疑算法問題,但又發現有部分數據又是正確的。算法應該沒啥問題。糾結之下之后打印了更詳細的日志,通過觀察詭異數據的規律最后恍然大悟:肯定是Bbolt收到對象后對對象的修改影響到了Cbolt。在這里筆者幾乎可以肯定的是:當Bbolt和Cbolt運行在同一個進程中時。發送給Bbolt和Cbolt的對象他們是公用的。Bbolt的修改會影響到Cbolt,反之亦然。如果Bbolt和Cbolt不是同一進程,則沒有此影響。這就解釋了為什么有的數據正常有的異常。

  下面舉一個例子代碼測試一下:

拓撲構建類:

public class Main {

	public static void main(String[] args) {
		TopologyBuilder builder = new TopologyBuilder();
		
		builder.setSpout("test", new TestWordSpout());
		
		builder.setBolt("print1",new PrintBolt("PrintBolt1")).shuffleGrouping("test");
		
		builder.setBolt("print2",new PrintBolt("PrintBolt2")).shuffleGrouping("test");
		
		Config conf = new Config();
		conf.setDebug(false);
		conf.setNumWorkers(1);
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("test-kafka-1", conf, builder.createTopology());
	}

}

spout類:

public class TestWordSpout extends BaseRichSpout {
   
	private static final long serialVersionUID = 1L;
    SpoutOutputCollector _collector;
    
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }
    
    public void close() {
        
    }
        
    public void nextTuple() {
        Utils.sleep(1000);
        Name name = new Name();
        name.setName("123");
        _collector.emit(new Values(name));
    }
    
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

}

bolt類:

public class PrintBolt extends BaseRichBolt {

	private static final long serialVersionUID = 1L;
	private String name;
	int taskid;

	public PrintBolt(String name){
		this.name = name;
	}
	
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.taskid = context.getThisTaskId();
		
	}

	@Override
	public void execute(Tuple input) {
		Name name = (Name) input.getValueByField("word");
		System.out.println(logPrefix()+name.getName());
		name.setName(this.name);
		
	}

	private String logPrefix(){
		return this.name+":";
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
	}
}

可能發生的執行結果:

PrintBolt2:123
PrintBolt1:123
PrintBolt2:123
PrintBolt1:123
PrintBolt2:123
PrintBolt1:123
PrintBolt2:PrintBolt1
PrintBolt2:123
PrintBolt1:123
PrintBolt1:123
PrintBolt2:123
PrintBolt1:123
PrintBolt2:123

  從上邊結果可以看到,PrintBolt2打印了PrintBolt1的修改。

  了解了這個情況,以后寫代碼就得要考慮到這種意外。如果一個對象會同時發送給兩個bolt來處理,切bolt都要對此對象進行修改,在做修改之前一定要克隆一份,而不要直接修改!

  

 

 

 

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM