Storm的acker消息確認機制...
ack/fail消息確認機制(確保一個tuple被完全處理)
在spout中發射tuple的時候需要同時發送messageid,這樣才相當於開啟了消息確認機制
如果你的topology里面的tuple比較多的話, 那么把acker的數量設置多一點,效率會高一點。
通過config.setNumAckers(num)來設置一個topology里面的acker的數量,默認值是1。
注意: acker用了特殊的算法,使得對於追蹤每個spout tuple的狀態所需要的內存量是恆定的(20 bytes)
注意:如果一個tuple在指定的timeout(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS默認值為30秒)時間內沒有被成功處理,那么這個tuple會被認為處理失敗了。
下面代碼中Bolt的execute中模擬消息的正常和失敗.
1 import java.util.Map; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.spout.SpoutOutputCollector; 6 import backtype.storm.task.OutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.topology.TopologyBuilder; 10 import backtype.storm.topology.base.BaseRichBolt; 11 import backtype.storm.topology.base.BaseRichSpout; 12 import backtype.storm.tuple.Fields; 13 import backtype.storm.tuple.Tuple; 14 import backtype.storm.tuple.Values; 15 import backtype.storm.utils.Utils; 16 17 /** 18 * 數字累加求和 19 * 先添加storm依賴 20 * 21 * @author Administrator 22 * 23 */ 24 public class LocalTopologySumAcker { 25 26 27 /** 28 * spout需要繼承baserichspout,實現未實現的方法 29 * @author Administrator 30 * 31 */ 32 public static class MySpout extends BaseRichSpout{ 33 private Map conf; 34 private TopologyContext context; 35 private SpoutOutputCollector collector; 36 37 /** 38 * 初始化方法,只會執行一次 39 * 在這里面可以寫一個初始化的代碼 40 * Map conf:其實里面保存的是topology的一些配置信息 41 * TopologyContext context:topology的上下文,類似於servletcontext 42 * SpoutOutputCollector collector:發射器,負責向外發射數據(tuple) 43 */ 44 @Override 45 public void open(Map conf, TopologyContext context, 46 SpoutOutputCollector collector) { 47 this.conf = conf; 48 this.context = context; 49 this.collector = collector; 50 } 51 52 int num = 1; 53 /** 54 * 這個方法是spout中最重要的方法, 55 * 這個方法會被storm框架循環調用,可以理解為這個方法是在一個while循環之內 56 * 每調用一次,會向外發射一條數據 57 */ 58 @Override 59 public void nextTuple() { 60 System.out.println("spout發射:"+num); 61 //把數據封裝到values中,稱為一個tuple,發射出去 62 //messageid:和tuple需要是一一對應的,可以把messageid認為是數據的主鍵id,而tuple中的內容就是這個數據. 63 //messageid和tuple中的消息是一一對應的. 它們之間的關系是需要我們程序員來維護的. 64 //this.collector.emit(new Values(num++)); 65 this.collector.emit(new Values(num++),num-1);//傳遞messageid(num-1)參數就表示開啟了消息確認機制. 66 Utils.sleep(1000); 67 } 68 69 @Override 70 public void ack(Object msgId) { 71 System.out.println("處理成功"); 72 } 73 74 @Override 75 public void fail(Object msgId) { 76 System.out.println("處理失敗....."+msgId); 77 //TODO--可以選擇把失敗的數據重發,或者單獨存儲后期進行分析 78 //重發的方法...this.collector.emit(tuple);//這個tuple可以根據參數msgId來獲得... 79 } 80 81 /** 82 * 聲明輸出字段 83 */ 84 @Override 85 public void declareOutputFields(OutputFieldsDeclarer declarer) { 86 //給values中的數據起個名字,方便后面的bolt從這個values中取數據 87 //fields中定義的參數和values中傳遞的數值是一一對應的 88 declarer.declare(new Fields("num")); 89 } 90 91 } 92 93 94 /** 95 * 自定義bolt需要實現baserichbolt 96 * @author Administrator 97 * 98 */ 99 public static class MyBolt extends BaseRichBolt{ 100 private Map stormConf; 101 private TopologyContext context; 102 private OutputCollector collector; 103 104 /** 105 * 和spout中的open方法意義一樣 106 */ 107 @Override 108 public void prepare(Map stormConf, TopologyContext context, 109 OutputCollector collector) { 110 this.stormConf = stormConf; 111 this.context = context; 112 this.collector = collector; 113 } 114 115 int sum = 0; 116 /** 117 * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理 118 */ 119 @Override 120 public void execute(Tuple input) { 121 try{ 122 //input.getInteger(0);//也可以根據角標獲取tuple中的數據 123 Integer value = input.getIntegerByField("num"); 124 if(value == 3){ 125 throw new Exception("value=3異常....."); 126 } 127 sum+=value; 128 System.out.println("和:"+sum); 129 this.collector.ack(input);//這個表示確認消息處理成功,spout中的ack方法會被調用 130 }catch(Exception e) { 131 this.collector.fail(input);//這個表示確認消息處理失敗,spout中的fail方法會被調用 132 e.printStackTrace(); 133 } 134 } 135 136 /** 137 * 聲明輸出字段 138 */ 139 @Override 140 public void declareOutputFields(OutputFieldsDeclarer declarer) { 141 //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。 142 //如果nextTuple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明 143 } 144 145 } 146 /** 147 * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統保留的 148 * @param args 149 */ 150 public static void main(String[] args) { 151 //組裝topology 152 TopologyBuilder topologyBuilder = new TopologyBuilder(); 153 topologyBuilder.setSpout("spout1", new MySpout()); 154 //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發射出來的tuple 155 topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1"); 156 157 //創建本地storm集群 158 LocalCluster localCluster = new LocalCluster(); 159 Config config = new Config(); 160 localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology()); 161 } 162 163 }
運行結果:
從結果可以看到Bolt1執行execute成功了通過ack 調用Spout1中的ack方法.... 失敗了就通過fail 調用Spout1中的fail 方法 來達到對消息處理成功與否的追蹤.
//=============================================================================================
上面的例子是一個Spout 和一個Bolt.....如果對應有1個Spout和2個Bolt 會是什么情況.....
改造上面的代碼.....
1 /** 2 * 數字累加求和 3 * 先添加storm依賴 4 * 5 * @author Administrator 6 * 7 */ 8 public class LocalTopologySumAcker2 { 9 10 11 /** 12 * spout需要繼承baserichspout,實現未實現的方法 13 * @author Administrator 14 * 15 */ 16 public static class MySpout extends BaseRichSpout{ 17 private Map conf; 18 private TopologyContext context; 19 private SpoutOutputCollector collector; 20 21 /** 22 * 初始化方法,只會執行一次 23 * 在這里面可以寫一個初始化的代碼 24 * Map conf:其實里面保存的是topology的一些配置信息 25 * TopologyContext context:topology的上下文,類似於servletcontext 26 * SpoutOutputCollector collector:發射器,負責向外發射數據(tuple) 27 */ 28 @Override 29 public void open(Map conf, TopologyContext context, 30 SpoutOutputCollector collector) { 31 this.conf = conf; 32 this.context = context; 33 this.collector = collector; 34 } 35 36 int num = 1; 37 /** 38 * 這個方法是spout中最重要的方法, 39 * 這個方法會被storm框架循環調用,可以理解為這個方法是在一個while循環之內 40 * 每調用一次,會向外發射一條數據 41 */ 42 @Override 43 public void nextTuple() { 44 System.out.println("spout發射:"+num); 45 //把數據封裝到values中,稱為一個tuple,發射出去 46 //messageid:和tuple需要是一一對應的,可以把messageid認為是數據的主鍵id,而tuple中的內容就是這個數據 47 //messageid和tuple之間的關系是需要我們程序員維護的 48 this.collector.emit(new Values(num++),num-1);//傳遞messageid參數就表示開啟了消息確認機制 49 Utils.sleep(1000); 50 } 51 52 /** 53 * 聲明輸出字段 54 */ 55 @Override 56 public void declareOutputFields(OutputFieldsDeclarer declarer) { 57 //給values中的數據起個名字,方便后面的bolt從這個values中取數據 58 //fields中定義的參數和values中傳遞的數值是一一對應的 59 declarer.declare(new Fields("num")); 60 } 61 62 @Override 63 public void ack(Object msgId) { 64 System.out.println("處理成功"); 65 } 66 67 @Override 68 public void fail(Object msgId) { 69 System.out.println("處理失敗。。"+msgId); 70 //TODO--可以選擇吧失敗的數據重發,或者單獨存儲后期分析 71 } 72 } 73 74 75 /** 76 * 自定義bolt需要實現baserichbolt 77 * @author Administrator 78 * 79 */ 80 public static class MyBolt1 extends BaseRichBolt{ 81 private Map stormConf; 82 private TopologyContext context; 83 private OutputCollector collector; 84 85 /** 86 * 和spout中的open方法意義一樣 87 */ 88 @Override 89 public void prepare(Map stormConf, TopologyContext context, 90 OutputCollector collector) { 91 this.stormConf = stormConf; 92 this.context = context; 93 this.collector = collector; 94 } 95 96 int sum = 0; 97 /** 98 * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理 99 */ 100 @Override 101 public void execute(Tuple input) { 102 try { 103 //input.getInteger(0);//也可以根據角標獲取tuple中的數據 104 Integer value = input.getIntegerByField("num"); 105 this.collector.emit(new Values(value+"_1")); 106 //this.collector.emit(input,new Values(value+"_1"));//新的tuple是new Values(value+"_1") 老的tuple是input 107 this.collector.ack(input);//確認數據處理成功,spout中的ack方法會被調用 108 } catch (Exception e) { 109 this.collector.fail(input);//確認數據處理失敗,spout中的fail方法會被調用 110 e.printStackTrace(); 111 } 112 } 113 114 /** 115 * 聲明輸出字段 116 */ 117 @Override 118 public void declareOutputFields(OutputFieldsDeclarer declarer) { 119 //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。 120 //如果nextT|uple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明 121 declarer.declare(new Fields("num_1")); 122 } 123 124 } 125 126 public static class MyBolt2 extends BaseRichBolt{ 127 private Map stormConf; 128 private TopologyContext context; 129 private OutputCollector collector; 130 131 /** 132 * 和spout中的open方法意義一樣 133 */ 134 @Override 135 public void prepare(Map stormConf, TopologyContext context, 136 OutputCollector collector) { 137 this.stormConf = stormConf; 138 this.context = context; 139 this.collector = collector; 140 } 141 142 int sum = 0; 143 /** 144 * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理 145 */ 146 @Override 147 public void execute(Tuple input) { 148 try { 149 //input.getInteger(0);//也可以根據角標獲取tuple中的數據 150 String value = input.getStringByField("num_1"); 151 System.out.println(value); 152 this.collector.fail(input);//確認數據處理成功,spout中的ack方法會被調用 153 //this.collector.ack(input);//確認數據處理成功,spout中的ack方法會被調用 154 } catch (Exception e) { 155 //this.collector.fail(input);//確認數據處理失敗,spout中的fail方法會被調用 156 e.printStackTrace(); 157 } 158 } 159 160 /** 161 * 聲明輸出字段 162 */ 163 @Override 164 public void declareOutputFields(OutputFieldsDeclarer declarer) { 165 //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。 166 //如果nextT|uple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明 167 declarer.declare(new Fields("num_1")); 168 } 169 170 } 171 172 /** 173 * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統保留的 174 * @param args 175 */ 176 public static void main(String[] args) { 177 //組裝topology 178 TopologyBuilder topologyBuilder = new TopologyBuilder(); 179 topologyBuilder.setSpout("spout1", new MySpout()); 180 //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發射出來的tuple 181 topologyBuilder.setBolt("bolt1", new MyBolt1()).shuffleGrouping("spout1"); 182 topologyBuilder.setBolt("bolt2", new MyBolt2()).shuffleGrouping("bolt1"); 183 184 //創建本地storm集群 185 LocalCluster localCluster = new LocalCluster(); 186 localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology()); 187 } 188 }
上面的代碼的大體意思是 Bolt1接收Spout1的輸出,接收之后在數據后面加上"_1",然后發送給Bolt2,Bolt2接收到之后直接打印.
在spout2中的execute()方法不管成功還是失敗 都調用 this.collector.fail(input); 方法....也就是Spout1發射的數據在Bolt1中處理都成功了,在Bolt2中的處理都失敗了.
看Spout1中的哪個方法會被執行.....也就是Spout2中調用的ack或者是fail對tuple的處理狀態結果是否有影響.
運行看結果:
可以看出都是成功的...這就說明tuple的處理狀態和Bolt2中ack或者是fail是沒有任何的關系的......只要Bolt1中處理tuple成功了,我們就認為是處理成功了...
如果Bolt1處理失敗了就認為是處理失敗了.. ...現在Bolt1中發射出去的tuple是無法追蹤的.....
能不能在Bolt1發射的數據中也加上一個messageid...這個在Bolt中的 this.collector.emit(new Values(value+"_1")); emit方法中是不支持傳入一個messageid的.
但是這樣有一種場景是有問題的. 單詞計數的例子:
這個Spout后面有兩個Bolt 一個SplitBolt 一個CountBolt SplitBolt 切割成一個個的單詞 然后再CountBolt中進行匯總....
按照上面在SplitBolt中切割成功了,就算處理成功了...但是有可能切割之后 在CountBolt中有一些Bolt沒有收到. 這樣最后其實是沒有成功的...
而且SpiltBolt中處理的tuple和CountBolt中的tuple之間是有關聯的. 后者是在前者之上切割出來的小tuple....
我們想達到兩個Bolt都處理成功了才認為是處理成功的...如何做?
上面的代碼中已經包括......這里再說明一下:
Spout1中 的 this.collector.emit(input,new Values(value+"_1")); ----> this.collector.emit(input,new Values(value+"_1"));//新的tuple是new Values(value+"_1") 老的tuple是input
在Spout2中還是不管是否異常都調用.. this.collector.fail(input);
看運行結果:
運行都失敗了........
這樣就達到了上面的"完全處理"的要求....
完全處理:保證一個tuple以及這個tuple衍生的所有tuple都被成功處理.
在storm里面一個tuple被完全處理的意思是: 這個tuple以及由這個tuple所衍生的所有的tuple都被成功處理。
如果把Bolt2的正常對應改為 this.collector.ack(input); 失敗對應 this.collector.fail(input);就回復正常了.....
如果Spout2后面還有Spout3 同樣把老的tuple在emit上帶上.........