Shuffle Grouping: 隨機分組, 隨機派發stream里面的tuple, 保證bolt中的每個任務接收到的tuple數目相同.(它能實現較好的負載均衡)
Fields Grouping:按字段分組, 比如按userid來分組, 具有同樣userid的tuple會被分到同一任務, 而不同的userid則會被分配到不同的任務
All Grouping: 廣播發送,對於每一個tuple,Bolts中的所有任務都會收到.
Global Grouping: 全局分組,這個tuple被分配到storm中的一個bolt的其中一個task.再具體一點就是分配給id值最低的那個task.
Non Grouping: 隨機分派,意思是說stream不關心到底誰會收到它的tuple.目前他和Shuffle grouping是一樣的效果,
Direct Grouping: 直接分組,這是一種比較特別的分組方法,用這種分組意味着消息的發送者具體由消息接收者的哪個task處理這個消息.只有被聲明為Direct Stream的消息流可以聲明這種分組方法.而且這種消息tuple必須使用emitDirect方法來發射.消息處理者可以通過TopologyContext來或者處理它的消息的taskid (OutputCollector.emit方法也會返回taskid)
Fields Grouping 的代碼
1 /** 2 * 數字累加求和 3 * 先添加storm依賴 4 */ 5 public class LocalTopologySumFieldsGrouping { 6 /** 7 * spout需要繼承baserichspout,實現未實現的方法 8 * @author Administrator 9 * 10 */ 11 public static class MySpout extends BaseRichSpout{ 12 private Map conf; 13 private TopologyContext context; 14 private SpoutOutputCollector collector; 15 16 /** 17 * 初始化方法,只會執行一次 18 * 在這里面可以寫一個初始化的代碼 19 * Map conf:其實里面保存的是topology的一些配置信息 20 * TopologyContext context:topology的上下文,類似於servletcontext 21 * SpoutOutputCollector collector:發射器,負責向外發射數據(tuple) 22 */ 23 @Override 24 public void open(Map conf, TopologyContext context, 25 SpoutOutputCollector collector) { 26 this.conf = conf; 27 this.context = context; 28 this.collector = collector; 29 } 30 31 int num = 1; 32 /** 33 * 這個方法是spout中最重要的方法, 34 * 這個方法會被storm框架循環調用,可以理解為這個方法是在一個while循環之內 35 * 每調用一次,會向外發射一條數據 36 */ 37 @Override 38 public void nextTuple() { 39 System.out.println("spout發射:"+num); 40 //把數據封裝到values中,稱為一個tuple,發射出去 41 this.collector.emit(new Values(num++,num%2)); 42 Utils.sleep(1000); 43 } 44 45 /** 46 * 聲明輸出字段 47 */ 48 @Override 49 public void declareOutputFields(OutputFieldsDeclarer declarer) { 50 //給values中的數據起個名字,方便后面的bolt從這個values中取數據 51 //fields中定義的參數和values中傳遞的數值是一一對應的 52 declarer.declare(new Fields("num","flag")); 53 } 54 } 55 56 /** 57 * 自定義bolt需要實現baserichbolt 58 * @author Administrator 59 * 60 */ 61 public static class MyBolt extends BaseRichBolt{ 62 private Map stormConf; 63 private TopologyContext context; 64 private OutputCollector collector; 65 66 /** 67 * 和spout中的open方法意義一樣 68 */ 69 @Override 70 public void prepare(Map stormConf, TopologyContext context, 71 OutputCollector collector) { 72 this.stormConf = stormConf; 73 this.context = context; 74 this.collector = collector; 75 } 76 77 int sum = 0; 78 /** 79 * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理 80 */ 81 @Override 82 public void execute(Tuple input) { 83 //input.getInteger(0);//也可以根據角標獲取tuple中的數據 84 Integer value = input.getIntegerByField("num"); 85 System.out.println("線程id:"+Thread.currentThread().getId()+",值:"+value); 86 //sum+=value; 87 //System.out.println("和:"+sum); 88 } 89 90 /** 91 * 聲明輸出字段 92 */ 93 @Override 94 public void declareOutputFields(OutputFieldsDeclarer declarer) { 95 //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。 96 //如果nextTuple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明 97 } 98 99 } 100 /** 101 * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統保留的 102 * @param args 103 */ 104 public static void main(String[] args) { 105 //組裝topology 106 TopologyBuilder topologyBuilder = new TopologyBuilder(); 107 topologyBuilder.setSpout("spout1", new MySpout()); 108 //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發射出來的tuple 109 topologyBuilder.setBolt("bolt1", new MyBolt(),3).fieldsGrouping("spout1", new Fields("flag")); 110 111 //創建本地storm集群 112 LocalCluster localCluster = new LocalCluster(); 113 localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology()); 114 } 115 116 }
ShuffleGrouping代碼
1 /** 2 * 數字累加求和 3 * 先添加storm依賴 4 */ 5 public class LocalTopologySumShufferGrouping { 6 /** 7 * spout需要繼承baserichspout,實現未實現的方法 8 * @author Administrator 9 * 10 */ 11 public static class MySpout extends BaseRichSpout{ 12 private Map conf; 13 private TopologyContext context; 14 private SpoutOutputCollector collector; 15 16 /** 17 * 初始化方法,只會執行一次 18 * 在這里面可以寫一個初始化的代碼 19 * Map conf:其實里面保存的是topology的一些配置信息 20 * TopologyContext context:topology的上下文,類似於servletcontext 21 * SpoutOutputCollector collector:發射器,負責向外發射數據(tuple) 22 */ 23 @Override 24 public void open(Map conf, TopologyContext context, 25 SpoutOutputCollector collector) { 26 this.conf = conf; 27 this.context = context; 28 this.collector = collector; 29 } 30 31 int num = 1; 32 /** 33 * 這個方法是spout中最重要的方法, 34 * 這個方法會被storm框架循環調用,可以理解為這個方法是在一個while循環之內 35 * 每調用一次,會向外發射一條數據 36 */ 37 @Override 38 public void nextTuple() { 39 System.out.println("spout發射:"+num); 40 //把數據封裝到values中,稱為一個tuple,發射出去 41 this.collector.emit(new Values(num++)); 42 Utils.sleep(1000); 43 } 44 45 /** 46 * 聲明輸出字段 47 */ 48 @Override 49 public void declareOutputFields(OutputFieldsDeclarer declarer) { 50 //給values中的數據起個名字,方便后面的bolt從這個values中取數據 51 //fields中定義的參數和values中傳遞的數值是一一對應的 52 declarer.declare(new Fields("num")); 53 } 54 55 } 56 57 58 /** 59 * 自定義bolt需要實現baserichbolt 60 * @author Administrator 61 * 62 */ 63 public static class MyBolt extends BaseRichBolt{ 64 private Map stormConf; 65 private TopologyContext context; 66 private OutputCollector collector; 67 68 /** 69 * 和spout中的open方法意義一樣 70 */ 71 @Override 72 public void prepare(Map stormConf, TopologyContext context, 73 OutputCollector collector) { 74 this.stormConf = stormConf; 75 this.context = context; 76 this.collector = collector; 77 } 78 79 int sum = 0; 80 /** 81 * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理 82 */ 83 @Override 84 public void execute(Tuple input) { 85 //input.getInteger(0);//也可以根據角標獲取tuple中的數據 86 Integer value = input.getIntegerByField("num"); 87 System.out.println("線程id:"+Thread.currentThread().getId()+",值:"+value);//這樣可以知道哪個線程接收到這個數據了. 88 //sum+=value; 89 //System.out.println("和:"+sum); 90 } 91 92 /** 93 * 聲明輸出字段 94 */ 95 @Override 96 public void declareOutputFields(OutputFieldsDeclarer declarer) { 97 //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。 98 //如果nextTuple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明 99 } 100 101 } 102 /** 103 * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統保留的 104 * @param args 105 */ 106 public static void main(String[] args) { 107 //組裝topology 108 TopologyBuilder topologyBuilder = new TopologyBuilder(); 109 topologyBuilder.setSpout("spout1", new MySpout()); 110 //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發射出來的tuple 111 topologyBuilder.setBolt("bolt1", new MyBolt(),3).globalGrouping("spout1"); 112 113 //創建本地storm集群 114 LocalCluster localCluster = new LocalCluster(); 115 localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology()); 116 } 117 }