Storm累計求和中使用各種分組Grouping


 

       Shuffle Grouping: 隨機分組, 隨機派發stream里面的tuple, 保證bolt中的每個任務接收到的tuple數目相同.(它能實現較好的負載均衡)

       Fields Grouping:按字段分組, 比如按userid來分組, 具有同樣userid的tuple會被分到同一任務, 而不同的userid則會被分配到不同的任務

       All Grouping: 廣播發送,對於每一個tuple,Bolts中的所有任務都會收到.

       Global Grouping: 全局分組,這個tuple被分配到storm中的一個bolt的其中一個task.再具體一點就是分配給id值最低的那個task.

       Non Grouping: 隨機分派,意思是說stream不關心到底誰會收到它的tuple.目前他和Shuffle grouping是一樣的效果,

       Direct Grouping: 直接分組,這是一種比較特別的分組方法,用這種分組意味着消息的發送者具體由消息接收者的哪個task處理這個消息.只有被聲明為Direct Stream的消息流可以聲明這種分組方法.而且這種消息tuple必須使用emitDirect方法來發射.消息處理者可以通過TopologyContext來或者處理它的消息的taskid (OutputCollector.emit方法也會返回taskid)

 Fields Grouping 的代碼

  1 /**
  2  * 數字累加求和
  3  * 先添加storm依賴
  4  */
  5 public class LocalTopologySumFieldsGrouping {
  6     /**
  7      * spout需要繼承baserichspout,實現未實現的方法
  8      * @author Administrator
  9      *
 10      */
 11     public static class MySpout extends BaseRichSpout{
 12         private Map conf;
 13         private TopologyContext context;
 14         private SpoutOutputCollector collector;
 15         
 16         /**
 17          * 初始化方法,只會執行一次
 18          * 在這里面可以寫一個初始化的代碼
 19          * Map conf:其實里面保存的是topology的一些配置信息
 20          * TopologyContext context:topology的上下文,類似於servletcontext
 21          * SpoutOutputCollector collector:發射器,負責向外發射數據(tuple)
 22          */
 23         @Override
 24         public void open(Map conf, TopologyContext context,
 25                 SpoutOutputCollector collector) {
 26             this.conf = conf;
 27             this.context = context;
 28             this.collector = collector;
 29         }
 30 
 31         int num = 1;
 32         /**
 33          * 這個方法是spout中最重要的方法,
 34          * 這個方法會被storm框架循環調用,可以理解為這個方法是在一個while循環之內
 35          * 每調用一次,會向外發射一條數據
 36          */
 37         @Override
 38         public void nextTuple() {
 39             System.out.println("spout發射:"+num);
 40             //把數據封裝到values中,稱為一個tuple,發射出去
 41             this.collector.emit(new Values(num++,num%2));
 42             Utils.sleep(1000);
 43         }
 44         
 45         /**
 46          * 聲明輸出字段
 47          */
 48         @Override
 49         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 50             //給values中的數據起個名字,方便后面的bolt從這個values中取數據
 51             //fields中定義的參數和values中傳遞的數值是一一對應的
 52             declarer.declare(new Fields("num","flag"));
 53         }
 54     }
 55     
 56     /**
 57      * 自定義bolt需要實現baserichbolt
 58      * @author Administrator
 59      *
 60      */
 61     public static class MyBolt extends BaseRichBolt{
 62         private Map stormConf; 
 63         private TopologyContext context;
 64         private OutputCollector collector;
 65         
 66         /**
 67          * 和spout中的open方法意義一樣
 68          */
 69         @Override
 70         public void prepare(Map stormConf, TopologyContext context,
 71                 OutputCollector collector) {
 72             this.stormConf = stormConf;
 73             this.context = context;
 74             this.collector = collector;
 75         }
 76 
 77         int sum = 0;
 78         /**
 79          * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理
 80          */
 81         @Override
 82         public void execute(Tuple input) {
 83             //input.getInteger(0);//也可以根據角標獲取tuple中的數據
 84             Integer value = input.getIntegerByField("num");
 85             System.out.println("線程id:"+Thread.currentThread().getId()+",值:"+value);
 86             //sum+=value;
 87             //System.out.println("和:"+sum);
 88         }
 89         
 90         /**
 91          * 聲明輸出字段
 92          */
 93         @Override
 94         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 95             //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。
 96             //如果nextTuple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明
 97         }
 98         
 99     }
100     /**
101      * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統保留的
102      * @param args
103      */
104     public static void main(String[] args) {
105         //組裝topology
106         TopologyBuilder topologyBuilder = new TopologyBuilder();
107         topologyBuilder.setSpout("spout1", new MySpout());
108         //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發射出來的tuple
109         topologyBuilder.setBolt("bolt1", new MyBolt(),3).fieldsGrouping("spout1", new Fields("flag"));
110         
111         //創建本地storm集群
112         LocalCluster localCluster = new LocalCluster();
113         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
114     }
115     
116 }

ShuffleGrouping代碼

  1 /**
  2  * 數字累加求和
  3  * 先添加storm依賴
  4  */
  5 public class LocalTopologySumShufferGrouping {
  6     /**
  7      * spout需要繼承baserichspout,實現未實現的方法
  8      * @author Administrator
  9      *
 10      */
 11     public static class MySpout extends BaseRichSpout{
 12         private Map conf;
 13         private TopologyContext context;
 14         private SpoutOutputCollector collector;
 15         
 16         /**
 17          * 初始化方法,只會執行一次
 18          * 在這里面可以寫一個初始化的代碼
 19          * Map conf:其實里面保存的是topology的一些配置信息
 20          * TopologyContext context:topology的上下文,類似於servletcontext
 21          * SpoutOutputCollector collector:發射器,負責向外發射數據(tuple)
 22          */
 23         @Override
 24         public void open(Map conf, TopologyContext context,
 25                 SpoutOutputCollector collector) {
 26             this.conf = conf;
 27             this.context = context;
 28             this.collector = collector;
 29         }
 30 
 31         int num = 1;
 32         /**
 33          * 這個方法是spout中最重要的方法,
 34          * 這個方法會被storm框架循環調用,可以理解為這個方法是在一個while循環之內
 35          * 每調用一次,會向外發射一條數據
 36          */
 37         @Override
 38         public void nextTuple() {
 39             System.out.println("spout發射:"+num);
 40             //把數據封裝到values中,稱為一個tuple,發射出去
 41             this.collector.emit(new Values(num++));
 42             Utils.sleep(1000);
 43         }
 44         
 45         /**
 46          * 聲明輸出字段
 47          */
 48         @Override
 49         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 50             //給values中的數據起個名字,方便后面的bolt從這個values中取數據
 51             //fields中定義的參數和values中傳遞的數值是一一對應的
 52             declarer.declare(new Fields("num"));
 53         }
 54         
 55     }
 56     
 57     
 58     /**
 59      * 自定義bolt需要實現baserichbolt
 60      * @author Administrator
 61      *
 62      */
 63     public static class MyBolt extends BaseRichBolt{
 64         private Map stormConf; 
 65         private TopologyContext context;
 66         private OutputCollector collector;
 67         
 68         /**
 69          * 和spout中的open方法意義一樣
 70          */
 71         @Override
 72         public void prepare(Map stormConf, TopologyContext context,
 73                 OutputCollector collector) {
 74             this.stormConf = stormConf;
 75             this.context = context;
 76             this.collector = collector;
 77         }
 78 
 79         int sum = 0;
 80         /**
 81          * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理
 82          */
 83         @Override
 84         public void execute(Tuple input) {
 85             //input.getInteger(0);//也可以根據角標獲取tuple中的數據
 86             Integer value = input.getIntegerByField("num");
 87             System.out.println("線程id:"+Thread.currentThread().getId()+",值:"+value);//這樣可以知道哪個線程接收到這個數據了.
 88             //sum+=value;
 89             //System.out.println("和:"+sum);
 90         }
 91         
 92         /**
 93          * 聲明輸出字段
 94          */
 95         @Override
 96         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 97             //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。
 98             //如果nextTuple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明
 99         }
100         
101     }
102     /**
103      * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統保留的
104      * @param args
105      */
106     public static void main(String[] args) {
107         //組裝topology
108         TopologyBuilder topologyBuilder = new TopologyBuilder();
109         topologyBuilder.setSpout("spout1", new MySpout());
110         //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發射出來的tuple
111         topologyBuilder.setBolt("bolt1", new MyBolt(),3).globalGrouping("spout1");
112         
113         //創建本地storm集群
114         LocalCluster localCluster = new LocalCluster();
115         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
116     }
117 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM