Storm的acker確認機制


 Storm的acker消息確認機制...

       ack/fail消息確認機制(確保一個tuple被完全處理)

       在spout中發射tuple的時候需要同時發送messageid,這樣才相當於開啟了消息確認機制

        如果你的topology里面的tuple比較多的話, 那么把acker的數量設置多一點,效率會高一點。

       通過config.setNumAckers(num)來設置一個topology里面的acker的數量,默認值是1。

       注意: acker用了特殊的算法,使得對於追蹤每個spout tuple的狀態所需要的內存量是恆定的(20 bytes)

       注意:如果一個tuple在指定的timeout(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS默認值為30秒)時間內沒有被成功處理,那么這個tuple會被認為處理失敗了。

下面代碼中Bolt的execute中模擬消息的正常和失敗.

  1 import java.util.Map;
  2 
  3 import backtype.storm.Config;
  4 import backtype.storm.LocalCluster;
  5 import backtype.storm.spout.SpoutOutputCollector;
  6 import backtype.storm.task.OutputCollector;
  7 import backtype.storm.task.TopologyContext;
  8 import backtype.storm.topology.OutputFieldsDeclarer;
  9 import backtype.storm.topology.TopologyBuilder;
 10 import backtype.storm.topology.base.BaseRichBolt;
 11 import backtype.storm.topology.base.BaseRichSpout;
 12 import backtype.storm.tuple.Fields;
 13 import backtype.storm.tuple.Tuple;
 14 import backtype.storm.tuple.Values;
 15 import backtype.storm.utils.Utils;
 16 
 17 /**
 18  * 數字累加求和
 19  * 先添加storm依賴
 20  * 
 21  * @author Administrator
 22  *
 23  */
 24 public class LocalTopologySumAcker {
 25     
 26     
 27     /**
 28      * spout需要繼承baserichspout,實現未實現的方法
 29      * @author Administrator
 30      *
 31      */
 32     public static class MySpout extends BaseRichSpout{
 33         private Map conf;
 34         private TopologyContext context;
 35         private SpoutOutputCollector collector;
 36         
 37         /**
 38          * 初始化方法,只會執行一次
 39          * 在這里面可以寫一個初始化的代碼
 40          * Map conf:其實里面保存的是topology的一些配置信息
 41          * TopologyContext context:topology的上下文,類似於servletcontext
 42          * SpoutOutputCollector collector:發射器,負責向外發射數據(tuple)
 43          */
 44         @Override
 45         public void open(Map conf, TopologyContext context,
 46                 SpoutOutputCollector collector) {
 47             this.conf = conf;
 48             this.context = context;
 49             this.collector = collector;
 50         }
 51 
 52         int num = 1;
 53         /**
 54          * 這個方法是spout中最重要的方法,
 55          * 這個方法會被storm框架循環調用,可以理解為這個方法是在一個while循環之內
 56          * 每調用一次,會向外發射一條數據
 57          */
 58         @Override
 59         public void nextTuple() {
 60             System.out.println("spout發射:"+num);
 61             //把數據封裝到values中,稱為一個tuple,發射出去
 62             //messageid:和tuple需要是一一對應的,可以把messageid認為是數據的主鍵id,而tuple中的內容就是這個數據.
 63             //messageid和tuple中的消息是一一對應的. 它們之間的關系是需要我們程序員來維護的.
 64             //this.collector.emit(new Values(num++));
 65             this.collector.emit(new Values(num++),num-1);//傳遞messageid(num-1)參數就表示開啟了消息確認機制.
 66             Utils.sleep(1000);
 67         }
 68         
 69         @Override
 70         public void ack(Object msgId) {
 71             System.out.println("處理成功");
 72         }
 73         
 74         @Override
 75         public void fail(Object msgId) {
 76             System.out.println("處理失敗....."+msgId);
 77             //TODO--可以選擇把失敗的數據重發,或者單獨存儲后期進行分析
 78             //重發的方法...this.collector.emit(tuple);//這個tuple可以根據參數msgId來獲得...
 79         }
 80         
 81         /**
 82          * 聲明輸出字段
 83          */
 84         @Override
 85         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 86             //給values中的數據起個名字,方便后面的bolt從這個values中取數據
 87             //fields中定義的參數和values中傳遞的數值是一一對應的
 88             declarer.declare(new Fields("num"));
 89         }
 90         
 91     }
 92     
 93     
 94     /**
 95      * 自定義bolt需要實現baserichbolt
 96      * @author Administrator
 97      *
 98      */
 99     public static class MyBolt extends BaseRichBolt{
100         private Map stormConf; 
101         private TopologyContext context;
102         private OutputCollector collector;
103         
104         /**
105          * 和spout中的open方法意義一樣
106          */
107         @Override
108         public void prepare(Map stormConf, TopologyContext context,
109                 OutputCollector collector) {
110             this.stormConf = stormConf;
111             this.context = context;
112             this.collector = collector;
113         }
114 
115         int sum = 0;
116         /**
117          * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理
118          */
119         @Override
120         public void execute(Tuple input) {
121             try{
122                 //input.getInteger(0);//也可以根據角標獲取tuple中的數據
123                 Integer value = input.getIntegerByField("num");
124                 if(value == 3){
125                     throw new Exception("value=3異常.....");
126                 }
127                 sum+=value;
128                 System.out.println("和:"+sum);
129                 this.collector.ack(input);//這個表示確認消息處理成功,spout中的ack方法會被調用
130             }catch(Exception e) {
131                 this.collector.fail(input);//這個表示確認消息處理失敗,spout中的fail方法會被調用
132                 e.printStackTrace();
133             }
134         }
135         
136         /**
137          * 聲明輸出字段
138          */
139         @Override
140         public void declareOutputFields(OutputFieldsDeclarer declarer) {
141             //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。
142             //如果nextTuple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明
143         }
144         
145     }
146     /**
147      * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統保留的
148      * @param args
149      */
150     public static void main(String[] args) {
151         //組裝topology
152         TopologyBuilder topologyBuilder = new TopologyBuilder();
153         topologyBuilder.setSpout("spout1", new MySpout());
154         //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發射出來的tuple
155         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1");
156         
157         //創建本地storm集群
158         LocalCluster localCluster = new LocalCluster();
159         Config config = new Config();
160         localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
161     }
162 
163 }

 運行結果:

從結果可以看到Bolt1執行execute成功了通過ack  調用Spout1中的ack方法....   失敗了就通過fail 調用Spout1中的fail 方法 來達到對消息處理成功與否的追蹤.

//=============================================================================================

上面的例子是一個Spout 和一個Bolt.....如果對應有1個Spout和2個Bolt 會是什么情況.....

改造上面的代碼.....

  1 /**
  2  * 數字累加求和
  3  * 先添加storm依賴
  4  * 
  5  * @author Administrator
  6  *
  7  */
  8 public class LocalTopologySumAcker2 {
  9     
 10     
 11     /**
 12      * spout需要繼承baserichspout,實現未實現的方法
 13      * @author Administrator
 14      *
 15      */
 16     public static class MySpout extends BaseRichSpout{
 17         private Map conf;
 18         private TopologyContext context;
 19         private SpoutOutputCollector collector;
 20         
 21         /**
 22          * 初始化方法,只會執行一次
 23          * 在這里面可以寫一個初始化的代碼
 24          * Map conf:其實里面保存的是topology的一些配置信息
 25          * TopologyContext context:topology的上下文,類似於servletcontext
 26          * SpoutOutputCollector collector:發射器,負責向外發射數據(tuple)
 27          */
 28         @Override
 29         public void open(Map conf, TopologyContext context,
 30                 SpoutOutputCollector collector) {
 31             this.conf = conf;
 32             this.context = context;
 33             this.collector = collector;
 34         }
 35 
 36         int num = 1;
 37         /**
 38          * 這個方法是spout中最重要的方法,
 39          * 這個方法會被storm框架循環調用,可以理解為這個方法是在一個while循環之內
 40          * 每調用一次,會向外發射一條數據
 41          */
 42         @Override
 43         public void nextTuple() {
 44             System.out.println("spout發射:"+num);
 45             //把數據封裝到values中,稱為一個tuple,發射出去
 46             //messageid:和tuple需要是一一對應的,可以把messageid認為是數據的主鍵id,而tuple中的內容就是這個數據
 47             //messageid和tuple之間的關系是需要我們程序員維護的
 48             this.collector.emit(new Values(num++),num-1);//傳遞messageid參數就表示開啟了消息確認機制
 49             Utils.sleep(1000);
 50         }
 51         
 52         /**
 53          * 聲明輸出字段
 54          */
 55         @Override
 56         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 57             //給values中的數據起個名字,方便后面的bolt從這個values中取數據
 58             //fields中定義的參數和values中傳遞的數值是一一對應的
 59             declarer.declare(new Fields("num"));
 60         }
 61 
 62         @Override
 63         public void ack(Object msgId) {
 64             System.out.println("處理成功");
 65         }
 66 
 67         @Override
 68         public void fail(Object msgId) {
 69             System.out.println("處理失敗。。"+msgId);
 70             //TODO--可以選擇吧失敗的數據重發,或者單獨存儲后期分析
 71         }
 72     }
 73     
 74     
 75     /**
 76      * 自定義bolt需要實現baserichbolt
 77      * @author Administrator
 78      *
 79      */
 80     public static class MyBolt1 extends BaseRichBolt{
 81         private Map stormConf; 
 82         private TopologyContext context;
 83         private OutputCollector collector;
 84         
 85         /**
 86          * 和spout中的open方法意義一樣
 87          */
 88         @Override
 89         public void prepare(Map stormConf, TopologyContext context,
 90                 OutputCollector collector) {
 91             this.stormConf = stormConf;
 92             this.context = context;
 93             this.collector = collector;
 94         }
 95 
 96         int sum = 0;
 97         /**
 98          * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理
 99          */
100         @Override
101         public void execute(Tuple input) {
102             try {
103                 //input.getInteger(0);//也可以根據角標獲取tuple中的數據
104                 Integer value = input.getIntegerByField("num");
105                 this.collector.emit(new Values(value+"_1"));
106                 //this.collector.emit(input,new Values(value+"_1"));//新的tuple是new Values(value+"_1")  老的tuple是input
107                 this.collector.ack(input);//確認數據處理成功,spout中的ack方法會被調用
108             } catch (Exception e) {
109                 this.collector.fail(input);//確認數據處理失敗,spout中的fail方法會被調用
110                 e.printStackTrace();
111             }
112         }
113         
114         /**
115          * 聲明輸出字段
116          */
117         @Override
118         public void declareOutputFields(OutputFieldsDeclarer declarer) {
119             //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。
120             //如果nextT|uple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明
121             declarer.declare(new Fields("num_1"));
122         }
123         
124     }
125     
126     public static class MyBolt2 extends BaseRichBolt{
127         private Map stormConf; 
128         private TopologyContext context;
129         private OutputCollector collector;
130         
131         /**
132          * 和spout中的open方法意義一樣
133          */
134         @Override
135         public void prepare(Map stormConf, TopologyContext context,
136                 OutputCollector collector) {
137             this.stormConf = stormConf;
138             this.context = context;
139             this.collector = collector;
140         }
141 
142         int sum = 0;
143         /**
144          * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理
145          */
146         @Override
147         public void execute(Tuple input) {
148             try {
149                 //input.getInteger(0);//也可以根據角標獲取tuple中的數據
150                 String value = input.getStringByField("num_1");
151                 System.out.println(value);
152                 this.collector.fail(input);//確認數據處理成功,spout中的ack方法會被調用
153                 //this.collector.ack(input);//確認數據處理成功,spout中的ack方法會被調用
154             } catch (Exception e) {
155                 //this.collector.fail(input);//確認數據處理失敗,spout中的fail方法會被調用
156                 e.printStackTrace();
157             }
158         }
159         
160         /**
161          * 聲明輸出字段
162          */
163         @Override
164         public void declareOutputFields(OutputFieldsDeclarer declarer) {
165             //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。
166             //如果nextT|uple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明
167             declarer.declare(new Fields("num_1"));
168         }
169         
170     }
171     
172     /**
173      * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統保留的
174      * @param args
175      */
176     public static void main(String[] args) {
177         //組裝topology
178         TopologyBuilder topologyBuilder = new TopologyBuilder();
179         topologyBuilder.setSpout("spout1", new MySpout());
180         //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發射出來的tuple
181         topologyBuilder.setBolt("bolt1", new MyBolt1()).shuffleGrouping("spout1");
182         topologyBuilder.setBolt("bolt2", new MyBolt2()).shuffleGrouping("bolt1");
183         
184         //創建本地storm集群
185         LocalCluster localCluster = new LocalCluster();
186         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
187     }
188 }

 

上面的代碼的大體意思是 Bolt1接收Spout1的輸出,接收之后在數據后面加上"_1",然后發送給Bolt2,Bolt2接收到之后直接打印.

在spout2中的execute()方法不管成功還是失敗 都調用   this.collector.fail(input);  方法....也就是Spout1發射的數據在Bolt1中處理都成功了,在Bolt2中的處理都失敗了.

看Spout1中的哪個方法會被執行.....也就是Spout2中調用的ack或者是fail對tuple的處理狀態結果是否有影響.

運行看結果:

可以看出都是成功的...這就說明tuple的處理狀態和Bolt2中ack或者是fail是沒有任何的關系的......只要Bolt1中處理tuple成功了,我們就認為是處理成功了...

如果Bolt1處理失敗了就認為是處理失敗了.. ...現在Bolt1中發射出去的tuple是無法追蹤的.....

能不能在Bolt1發射的數據中也加上一個messageid...這個在Bolt中的   this.collector.emit(new Values(value+"_1"));  emit方法中是不支持傳入一個messageid的.

 

但是這樣有一種場景是有問題的.  單詞計數的例子:

這個Spout后面有兩個Bolt  一個SplitBolt 一個CountBolt    SplitBolt 切割成一個個的單詞  然后再CountBolt中進行匯總....

按照上面在SplitBolt中切割成功了,就算處理成功了...但是有可能切割之后 在CountBolt中有一些Bolt沒有收到. 這樣最后其實是沒有成功的...

而且SpiltBolt中處理的tuple和CountBolt中的tuple之間是有關聯的. 后者是在前者之上切割出來的小tuple....

我們想達到兩個Bolt都處理成功了才認為是處理成功的...如何做?

上面的代碼中已經包括......這里再說明一下:

Spout1中 的   this.collector.emit(input,new Values(value+"_1"));   ---->    this.collector.emit(input,new Values(value+"_1"));//新的tuple是new Values(value+"_1")  老的tuple是input
在Spout2中還是不管是否異常都調用.. this.collector.fail(input);

看運行結果:

 
        

運行都失敗了........

這樣就達到了上面的"完全處理"的要求....

完全處理:保證一個tuple以及這個tuple衍生的所有tuple都被成功處理.

在storm里面一個tuple被完全處理的意思是: 這個tuple以及由這個tuple所衍生的所有的tuple都被成功處理。

 

如果把Bolt2的正常對應改為  this.collector.ack(input);  失敗對應 this.collector.fail(input);就回復正常了.....

 

如果Spout2后面還有Spout3  同樣把老的tuple在emit上帶上.........

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM