storm的分組策略
-
洗牌分組(Shuffle grouping): 隨機分配元組到Bolt的某個任務上,這樣保證同一個Bolt的每個任務都能夠得到相同數量的元組。
-
字段分組(Fields grouping): 按照指定的分組字段來進行流的分組。例如,流是用字段“user-id"來分組的,那有着相同“user-id"的元組就會分到同一個任務里,但是有不同“user-id"的元組就會分到不同的任務里。這是一種非常重要的分組方式,通過這種流分組方式,我們就可以做到讓Storm產出的消息在這個"user-id"級別是嚴格有序的,這對一些對時序敏感的應用(例如,計費系統)是非常重要的。
-
Partial Key grouping: 跟字段分組一樣,流也是用指定的分組字段進行分組的,但是在多個下游Bolt之間是有負載均衡的,這樣當輸入數據有傾斜時可以更好的利用資源。這篇論文很好的解釋了這是如何工作的,有哪些優勢。
-
All grouping: 流會復制給Bolt的所有任務。小心使用這種分組方式。在拓撲中,如果希望某類元祖發送到所有的下游消費者,就可以使用這種All grouping的流分組策略。
-
Global grouping: 整個流會分配給Bolt的一個任務。具體一點,會分配給有最小ID的任務。
不分組(None grouping): 說明不關心流是如何分組的。目前,None grouping等價於洗牌分組。 -
Direct grouping:一種特殊的分組。對於這樣分組的流,元組的生產者決定消費者的哪個任務會接收處理這個元組。只能在聲明做直連的流(direct streams)上聲明Direct groupings分組方式。只能通過使用emitDirect系列函數來吐元組給直連流。一個Bolt可以通過提供的TopologyContext來獲得消費者的任務ID,也可以通過OutputCollector對象的emit函數(會返回元組被發送到的任務的ID)來跟蹤消費者的任務ID。在ack的實現中,Spout有兩個直連輸入流,ack和ackFail,使用了這種直連分組的方式。
-
Local or shuffle grouping:如果目標Bolt在同一個worker進程里有一個或多個任務,元組就會通過洗牌的方式分配到這些同一個進程內的任務里。否則,就跟普通的洗牌分組一樣。這種方式的好處是可以提高拓撲的處理效率,因為worker內部通信就是進程內部通信了,相比拓撲間的進程間通信要高效的多。worker進程間通信是通過使用Netty來進行網絡通信的。
根據實例來分析分組策略
common配置:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sonly.strom</groupId>
<artifactId>strom-study</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.sonly.storm.demo1.HelloToplogy</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Shuffle grouping
shuffle grouping的實例代碼
package com.sonly.storm.demo1.grouppings.spout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
/**
* <b>package:com.sonly.storm.demo1</b>
* <b>project(項目):stormstudy</b>
* <b>class(類)${CLASS_NAME}</b>
* <b>creat date(創建時間):2019-05-09 20:27</b>
* <b>author(作者):</b>xxydliuyss</br>
* <b>note(備注)):</b>
* If you want to change the file header,please modify zhe File and Code Templates.
*/
public class WordSpout extends BaseRichSpout {
public static final Logger LOGGER = LoggerFactory.getLogger(WordSpout.class);
//拓撲上下文
private TopologyContext context;
private SpoutOutputCollector collector;
private Map config;
private AtomicInteger atomicInteger = new AtomicInteger(0);
public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) {
this.config = conf;
this.context = topologyContext;
this.collector = collector;
LOGGER.warn("WordSpout->open:hashcode:{}->ThreadId:{},TaskId:{}", this.hashCode(), Thread.currentThread().getId(), context.getThisTaskId());
}
public void nextTuple() {
String[] sentences = new String[]{"zhangsan","zhangsan","zhangsan","zhangsan","zhangsan","zhangsan","zhangsan","zhangsan","lisi","lisi"};
int i = atomicInteger.get();
if(i<10){
atomicInteger.incrementAndGet();
final String sentence = sentences[i];
collector.emit(new Values(sentence));
LOGGER.warn("WordSpout->nextTuple:hashcode:{}->ThreadId:{},TaskId:{},Values:{}", this.hashCode(), Thread.currentThread().getId(), context.getThisTaskId(), sentence);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
bolt1
package com.sonly.storm.demo1.grouppings;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* <b>package:com.sonly.storm.demo1</b>
* <b>project(項目):stormstudy</b>
* <b>class(類)${CLASS_NAME}</b>
* <b>creat date(創建時間):2019-05-09 21:19</b>
* <b>author(作者):</b>xxydliuyss</br>
* <b>note(備注)):</b>
* If you want to change the file header,please modify zhe File and Code Templates.
*/
public class SheffleGroupingBolt extends BaseRichBolt {
public static final Logger LOGGER = LoggerFactory.getLogger(SheffleGroupingBolt.class);
private TopologyContext context;
private Map conf;
private OutputCollector collector;
private Map<String,Integer> counts = new HashMap(16);
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.conf=map;
this.context = topologyContext;
this.collector = outputCollector;
LOGGER.warn("SheffleGroupingBolt->prepare:hashcode:{}->ThreadId:{},TaskId:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId());
}
public void execute(Tuple tuple) {
String word = tuple.getString(0);
LOGGER.warn("SheffleGroupingBolt->execute:hashcode:{}->ThreadId:{},TaskId:{},value:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId(),word);
collector.emit(new Values(word));
collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("bolt1"));
}
}
bolt
package com.sonly.storm.demo1.grouppings;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* <b>package:com.sonly.storm.demo1</b>
* <b>project(項目):stormstudy</b>
* <b>class(類)${CLASS_NAME}</b>
* <b>creat date(創建時間):2019-05-09 21:29</b>
* <b>author(作者):</b>xxydliuyss</br>
* <b>note(備注)):</b>
* If you want to change the file header,please modify zhe File and Code Templates.
*/
public class SheffleGrouppingBolt1 extends BaseRichBolt {
public static final Logger LOGGER = LoggerFactory.getLogger(SheffleGrouppingBolt1.class);
private TopologyContext context;
private Map conf;
private OutputCollector collector;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.conf=map;
this.context = topologyContext;
this.collector = outputCollector;
LOGGER.warn("SheffleGrouppingBolt1->prepare:hashcode:{}->ThreadId:{},TaskId:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId());
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("sentence");
LOGGER.warn("SheffleGroupingBolt1->execute:hashcode:{}->ThreadId:{},TaskId:{},value:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId(),word);
collector.emit(new Values(word));
collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("bolt"));
}
}
topology
package com.sonly.storm.demo1.grouppings;
import com.sonly.storm.demo1.grouppings.spout.WordSpout;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <b>package:com.sonly.storm.demo1</b>
* <b>project(項目):stormstudy</b>
* <b>class(類)${CLASS_NAME}</b>
* <b>creat date(創建時間):2019-05-09 21:55</b>
* <b>author(作者):</b>xxydliuyss</br>
* <b>note(備注)):</b>
* If you want to change the file header,please modify zhe File and Code Templates.
*/
public class ShuffleGroupingToplogy {
public static final Logger LOGGER = LoggerFactory.getLogger(ShuffleGroupingToplogy.class);
//Topology Name
//component prefix
//workers
//spout executor (parallelism_hint)
//spout task size
//bolt executor (parallelism_hint)
//bolt task size
public static void main(String[] args) throws InterruptedException {
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
conf.setDebug(true);
if (args==null || args.length < 7) {
conf.setNumWorkers(3);
builder.setSpout("spout", new WordSpout(), 4).setNumTasks(4);
builder.setBolt("split-bolt", new SheffleGrouppingBolt1(), 4).shuffleGrouping("spout").setNumTasks(8);
builder.setBolt("count-bolt", new SheffleGroupingBolt(), 8).fieldsGrouping("split-bolt", new Fields("word")).setNumTasks(8);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.killTopology("word-count");
cluster.shutdown();
}
else {
Options options = Options.builder(args);
LOGGER.warn("The Topology Options {} is Submited ",options.toString());
conf.setNumWorkers(options.getWorkers());
builder.setSpout(options.getPrefix()+"-spout", new WordSpout(), options.getSpoutParallelismHint()).setNumTasks(options.getSpoutTaskSize());
builder.setBolt("bolt1", new SheffleGrouppingBolt1(), options.getBoltParallelismHint()).shuffleGrouping(options.getPrefix()+"-spout").setNumTasks(options.getBoltTaskSize());
builder.setBolt("bolt", new SheffleGroupingBolt(), options.getBoltParallelismHint()).shuffleGrouping(options.getPrefix()+"-spout").setNumTasks(options.getBoltTaskSize());
try {
StormSubmitter.submitTopologyWithProgressBar(options.getTopologyName(), conf, builder.createTopology());
LOGGER.warn("===========================================================");
LOGGER.warn("The Topology {} is Submited ",options.getTopologyName());
LOGGER.warn("===========================================================");
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
}
}
public static class Options{
private String topologyName;
private String prefix;
private Integer workers;
private Integer spoutParallelismHint;
private Integer spoutTaskSize;
private Integer boltParallelismHint;
private Integer boltTaskSize;
public Options(String topologyName, String prefix, Integer workers, Integer spoutParallelismHint, Integer spoutTaskSize, Integer boltParallelismHint, Integer boltTaskSize) {
this.topologyName = topologyName;
this.prefix = prefix;
this.workers = workers;
this.spoutParallelismHint = spoutParallelismHint;
this.spoutTaskSize = spoutTaskSize;
this.boltParallelismHint = boltParallelismHint;
this.boltTaskSize = boltTaskSize;
}
public static Options builder(String[] args){
return new Options(args[0],args[1],Integer.parseInt(args[2])
,Integer.parseInt(args[3]),Integer.parseInt(args[4]),Integer.parseInt(args[5]),Integer.parseInt(args[6])
);
}
public String getTopologyName() {
return topologyName;
}
public void setTopologyName(String topologyName) {
this.topologyName = topologyName;
}
public String getPrefix() {
return prefix;
}
public void setPrefix(String prefix) {
this.prefix = prefix;
}
public Integer getWorkers() {
return workers;
}
public void setWorkers(Integer workers) {
this.workers = workers;
}
public Integer getSpoutParallelismHint() {
return spoutParallelismHint;
}
public void setSpoutParallelismHint(Integer spoutParallelismHint) {
this.spoutParallelismHint = spoutParallelismHint;
}
public Integer getSpoutTaskSize() {
return spoutTaskSize;
}
public void setSpoutTaskSize(Integer spoutTaskSize) {
this.spoutTaskSize = spoutTaskSize;
}
public Integer getBoltParallelismHint() {
return boltParallelismHint;
}
public void setBoltParallelismHint(Integer boltParallelismHint) {
this.boltParallelismHint = boltParallelismHint;
}
public Integer getBoltTaskSize() {
return boltTaskSize;
}
public void setBoltTaskSize(Integer boltTaskSize) {
this.boltTaskSize = boltTaskSize;
}
@Override
public String toString() {
return "Options{" +
"topologyName='" + topologyName + '\'' +
", prefix='" + prefix + '\'' +
", workers=" + workers +
", spoutParallelismHint=" + spoutParallelismHint +
", spoutTaskSize=" + spoutTaskSize +
", boltParallelismHint=" + boltParallelismHint +
", boltTaskSize=" + boltTaskSize +
'}';
}
}
}
mvn package 打包,上傳到storm服務器
ShuffleGrouping 樣例分析
1)樣例1
1.執行:
storm jar strom-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.sonly.storm.demo1.grouppings.ShuffleGroupingToplogy ShuffleGrouping ShuffleGrouping 1 2 1 2 1
2.參數:
topologyName='ShuffleGrouping', prefix='ShuffleGrouping', workers=1, spoutParallelismHint=2, spoutTaskSize=1, boltParallelismHint=2, boltTaskSize=1
3.拓撲圖:
一個spout接了兩個bolt
4.查看一下這個bolt分布情況:
5.進入服務器去看每一個bolt的日志
2019-05-07 18:09:13.109 c.s.s.d.g.SheffleGrouppingBolt1 Thread-11-bolt1-executor[5 5] [WARN] SheffleGroupingBolt1->execute:hashcode:1393282516->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 18:09:13.110 c.s.s.d.g.SheffleGrouppingBolt1 Thread-11-bolt1-executor[5 5] [WARN] SheffleGroupingBolt1->execute:hashcode:1393282516->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 18:09:13.110 c.s.s.d.g.SheffleGrouppingBolt1 Thread-11-bolt1-executor[5 5] [WARN] SheffleGroupingBolt1->execute:hashcode:1393282516->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 18:09:13.111 c.s.s.d.g.SheffleGrouppingBolt1 Thread-11-bolt1-executor[5 5] [WARN] SheffleGroupingBolt1->execute:hashcode:1393282516->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 18:09:13.112 c.s.s.d.g.SheffleGrouppingBolt1 Thread-11-bolt1-executor[5 5] [WARN] SheffleGroupingBolt1->execute:hashcode:1393282516->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 18:09:13.115 c.s.s.d.g.SheffleGrouppingBolt1 Thread-11-bolt1-executor[5 5] [WARN] SheffleGroupingBolt1->execute:hashcode:1393282516->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 18:09:13.116 c.s.s.d.g.SheffleGrouppingBolt1 Thread-11-bolt1-executor[5 5] [WARN] SheffleGroupingBolt1->execute:hashcode:1393282516->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 18:09:13.117 c.s.s.d.g.SheffleGrouppingBolt1 Thread-11-bolt1-executor[5 5] [WARN] SheffleGroupingBolt1->execute:hashcode:1393282516->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 18:09:13.118 c.s.s.d.g.SheffleGrouppingBolt1 Thread-11-bolt1-executor[5 5] [WARN] SheffleGroupingBolt1->execute:hashcode:1393282516->ThreadId:45,TaskId:5,value:lisi
2019-05-07 18:09:13.119 c.s.s.d.g.SheffleGrouppingBolt1 Thread-11-bolt1-executor[5 5] [WARN] SheffleGroupingBolt1->execute:hashcode:1393282516->ThreadId:45,TaskId:5,value:lisi
6.進入另外一個bolt的日志 10條信息被處理了
2019-05-07 18:09:00.791 c.s.s.d.g.SheffleGroupingBolt Thread-9-bolt-executor[4 4] [WARN] SheffleGroupingBolt->execute:hashcode:1430296959->ThreadId:43,TaskId:4,value:zhangsan
2019-05-07 18:09:00.793 c.s.s.d.g.SheffleGroupingBolt Thread-9-bolt-executor[4 4] [WARN] SheffleGroupingBolt->execute:hashcode:1430296959->ThreadId:43,TaskId:4,value:zhangsan
2019-05-07 18:09:00.794 c.s.s.d.g.SheffleGroupingBolt Thread-9-bolt-executor[4 4] [WARN] SheffleGroupingBolt->execute:hashcode:1430296959->ThreadId:43,TaskId:4,value:zhangsan
2019-05-07 18:09:00.795 c.s.s.d.g.SheffleGroupingBolt Thread-9-bolt-executor[4 4] [WARN] SheffleGroupingBolt->execute:hashcode:1430296959->ThreadId:43,TaskId:4,value:zhangsan
2019-05-07 18:09:00.795 c.s.s.d.g.SheffleGroupingBolt Thread-9-bolt-executor[4 4] [WARN] SheffleGroupingBolt->execute:hashcode:1430296959->ThreadId:43,TaskId:4,value:zhangsan
2019-05-07 18:09:00.796 c.s.s.d.g.SheffleGroupingBolt Thread-9-bolt-executor[4 4] [WARN] SheffleGroupingBolt->execute:hashcode:1430296959->ThreadId:43,TaskId:4,value:zhangsan
2019-05-07 18:09:00.797 c.s.s.d.g.SheffleGroupingBolt Thread-9-bolt-executor[4 4] [WARN] SheffleGroupingBolt->execute:hashcode:1430296959->ThreadId:43,TaskId:4,value:zhangsan
2019-05-07 18:09:00.805 c.s.s.d.g.SheffleGroupingBolt Thread-9-bolt-executor[4 4] [WARN] SheffleGroupingBolt->execute:hashcode:1430296959->ThreadId:43,TaskId:4,value:zhangsan
2019-05-07 18:09:00.805 c.s.s.d.g.SheffleGroupingBolt Thread-9-bolt-executor[4 4] [WARN] SheffleGroupingBolt->execute:hashcode:1430296959->ThreadId:43,TaskId:4,value:lisi
2019-05-07 18:09:00.806 c.s.s.d.g.SheffleGroupingBolt Thread-9-bolt-executor[4 4] [WARN] SheffleGroupingBolt->execute:hashcode:1430296959->ThreadId:43,TaskId:4,value:lisi
也是一樣10條被處理了
總結:
對於spout直接對接兩個bolt,sheffgrouping 分組不會隨機給兩個bolt分配消息,而是全量發給兩個BOlT
2)樣例2
1.修改一下參數看一下:
topologyName='ShuffleGrouping1', prefix='ShuffleGrouping1', workers=2, spoutParallelismHint=1, spoutTaskSize=2, boltParallelismHint=2, boltTaskSize=2
總共4個bolt,兩個spout,總共發送了40條消息,spout產生消息20條。transfer了40次。
看看4個bolt的消息分配的情況。
因為只有兩個worker所以會有兩個bolt在同一個work上,日志會打在一起,但是從名字可以可以區分開來,同樣每個bolt都是10條。
2.修改拓撲結構為:
3.修改代碼:
bolt
String word = tuple.getStringByField("bolt");
topoloy:
builder.setBolt("bolt1", new SheffleGrouppingBolt1(), 1).shuffleGrouping(options.getPrefix()+"-spout");
builder.setBolt("bolt", new SheffleGroupingBolt(), options.getBoltParallelismHint()).shuffleGrouping("bolt1").setNumTasks(options.getBoltTaskSize());
4.參數:
topologyName='ShuffleGrouping2', prefix='ShuffleGrouping2', workers=2, spoutParallelismHint=1, spoutTaskSize=1, boltParallelismHint=2, boltTaskSize=2
5.查看日志:k8s-n2 這個節點只有bolt bolt1這個節點在k8s-n3上
[root@k8s-n2 6706]# grep "SheffleGroupingBolt->execute" worker.log |wc -l
3
[root@k8s-n2 6706]# grep "SheffleGroupingBolt1->execute" worker.log |wc -l
0
[root@k8s-n3 6706]# grep "SheffleGroupingBolt->execute" worker.log |wc -l
7
[root@k8s-n3 6706]# grep "SheffleGroupingBolt1->execute" worker.log |wc -l
10
可以看出來bolt1->bolt這條線上的數據被隨機分配了一個三條一個兩條。
總結:
對於bolt 連接bolt的shuffingGrouping,消息是隨機分配到多個bolt上面的
Fields grouping
Fields grouping 的實例
代碼:
public class FeildGroupingToplogy {
public static final Logger LOGGER = LoggerFactory.getLogger(FeildGroupingToplogy.class);
//Topology Name
//component prefix
//workers
//spout executor (parallelism_hint)
//spout task size
//bolt executor (parallelism_hint)
//bolt task size
public static void main(String[] args) throws InterruptedException {
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
conf.setDebug(true);
Options options = Options.builder(args);
LOGGER.warn("The Topology Options {} is Submited ", options.toString());
conf.setNumWorkers(options.getWorkers());
String spoutName = options.getPrefix() + "-spout";
builder.setSpout(spoutName, new WordSpout(), options.getSpoutParallelismHint()).setNumTasks(options.getSpoutTaskSize());
builder.setBolt(options.getPrefix() + "bolt1", new FieldGrouppingBolt1(), options.getBoltParallelismHint()).fieldsGrouping(spoutName, new Fields("sentence")).setNumTasks(options.getBoltTaskSize());
builder.setBolt(options.getPrefix() + "bolt", new FieldGroupingBolt(), options.getBoltParallelismHint()).fieldsGrouping(spoutName, new Fields("sentence")).setNumTasks(options.getBoltTaskSize());
// builder.setBolt("bolt1", new FieldGrouppingBolt1(), 1).shuffleGrouping(options.getPrefix()+"-spout");
// builder.setBolt("bolt", new FieldGroupingBolt(), options.getBoltParallelismHint()).fieldsGrouping("bolt1",new Fields("bolt")).setNumTasks(options.getBoltTaskSize());
try {
StormSubmitter.submitTopologyWithProgressBar(options.getTopologyName(), conf, builder.createTopology());
LOGGER.warn("===========================================================");
LOGGER.warn("The Topology {} is Submited ", options.getTopologyName());
LOGGER.warn("===========================================================");
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
}
public static class Options {
private String topologyName;
private String prefix;
private Integer workers;
private Integer spoutParallelismHint;
private Integer spoutTaskSize;
private Integer boltParallelismHint;
private Integer boltTaskSize;
public Options(String topologyName, String prefix, Integer workers, Integer spoutParallelismHint, Integer spoutTaskSize, Integer boltParallelismHint, Integer boltTaskSize) {
this.topologyName = topologyName;
this.prefix = prefix;
this.workers = workers;
this.spoutParallelismHint = spoutParallelismHint;
this.spoutTaskSize = spoutTaskSize;
this.boltParallelismHint = boltParallelismHint;
this.boltTaskSize = boltTaskSize;
}
public static Options builder(String[] args) {
return new Options(args[0], args[1], Integer.parseInt(args[2])
, Integer.parseInt(args[3]), Integer.parseInt(args[4]), Integer.parseInt(args[5]), Integer.parseInt(args[6])
);
}
public String getTopologyName() {
return topologyName;
}
public void setTopologyName(String topologyName) {
this.topologyName = topologyName;
}
public String getPrefix() {
return prefix;
}
public void setPrefix(String prefix) {
this.prefix = prefix;
}
public Integer getWorkers() {
return workers;
}
public void setWorkers(Integer workers) {
this.workers = workers;
}
public Integer getSpoutParallelismHint() {
return spoutParallelismHint;
}
public void setSpoutParallelismHint(Integer spoutParallelismHint) {
this.spoutParallelismHint = spoutParallelismHint;
}
public Integer getSpoutTaskSize() {
return spoutTaskSize;
}
public void setSpoutTaskSize(Integer spoutTaskSize) {
this.spoutTaskSize = spoutTaskSize;
}
public Integer getBoltParallelismHint() {
return boltParallelismHint;
}
public void setBoltParallelismHint(Integer boltParallelismHint) {
this.boltParallelismHint = boltParallelismHint;
}
public Integer getBoltTaskSize() {
return boltTaskSize;
}
public void setBoltTaskSize(Integer boltTaskSize) {
this.boltTaskSize = boltTaskSize;
}
@Override
public String toString() {
return "Options{" +
"topologyName='" + topologyName + '\'' +
", prefix='" + prefix + '\'' +
", workers=" + workers +
", spoutParallelismHint=" + spoutParallelismHint +
", spoutTaskSize=" + spoutTaskSize +
", boltParallelismHint=" + boltParallelismHint +
", boltTaskSize=" + boltTaskSize +
'}';
}
}
}
public class FieldGroupingBolt extends BaseRichBolt {
public static final Logger LOGGER = LoggerFactory.getLogger(FieldGroupingBolt.class);
private TopologyContext context;
private Map conf;
private OutputCollector collector;
private Map<String,Integer> counts = new HashMap(16);
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.conf=map;
this.context = topologyContext;
this.collector = outputCollector;
LOGGER.warn("FieldGroupingBolt->prepare:hashcode:{}->ThreadId:{},TaskId:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId());
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("bolt");
LOGGER.warn("FieldGroupingBolt->execute:hashcode:{}->ThreadId:{},TaskId:{},value:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId(),word);
collector.emit(new Values(word));
collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("bolt1"));
}
}
public class FieldGrouppingBolt1 extends BaseRichBolt {
public static final Logger LOGGER = LoggerFactory.getLogger(FieldGrouppingBolt1.class);
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.context = topologyContext;
this.collector = outputCollector;
LOGGER.warn("FieldGrouppingBolt1->prepare:hashcode:{}->ThreadId:{},TaskId:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId());
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("sentence");
LOGGER.warn("SheffleGroupingBolt1->execute:hashcode:{}->ThreadId:{},TaskId:{},value:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId(),word);
collector.emit(new Values(word));
collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("bolt"));
}
}
2.打包上傳到服務器
3.執行:
storm jar strom-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.sonly.storm.demo1.grouppings.fieldgrouping.FeildGroupingToplogy FieldGrouping1 FieldGrouping1 2 1 1 2 2
4.參數
topologyName='FieldGrouping1', prefix='FieldGrouping1', workers=2, spoutParallelismHint=1, spoutTaskSize=1, boltParallelismHint=2, boltTaskSize=2
5。拓撲圖:
6.並發度以及組件分布圖:
同樣看圖可以看到消息被發送了20次,但是被transfer40次。這是因為spout對bolt,對消息進行了復制,全量發送到了每個bolt,所以每個bolt都會有10條消息。
總結:
和sheffleGrouping 一樣,spout->bolt是全量廣播發送,每個bolt都會spout的全量消息。
樣例2
1.修改拓撲的代碼
public static void main(String[] args) throws InterruptedException {
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
conf.setDebug(true);
Options options = Options.builder(args);
LOGGER.warn("The Topology Options {} is Submited ", options.toString());
conf.setNumWorkers(options.getWorkers());
String spoutName = options.getPrefix() + "-spout";
builder.setSpout(spoutName, new WordSpout(), options.getSpoutParallelismHint()).setNumTasks(options.getSpoutTaskSize());
// builder.setBolt(options.getPrefix() + "bolt1", new FieldGrouppingBolt1(), options.getBoltParallelismHint()).fieldsGrouping(spoutName, new Fields("sentence")).setNumTasks(options.getBoltTaskSize());
// builder.setBolt(options.getPrefix() + "bolt", new FieldGroupingBolt(), options.getBoltParallelismHint()).fieldsGrouping(spoutName, new Fields("sentence")).setNumTasks(options.getBoltTaskSize());
builder.setBolt("bolt1", new FieldGrouppingBolt1(), 1).fieldsGrouping(spoutName, new Fields("sentence"));
builder.setBolt("bolt", new FieldGroupingBolt(), options.getBoltParallelismHint()).fieldsGrouping("bolt1",new Fields("bolt")).setNumTasks(options.getBoltTaskSize());
try {
StormSubmitter.submitTopologyWithProgressBar(options.getTopologyName(), conf, builder.createTopology());
LOGGER.warn("===========================================================");
LOGGER.warn("The Topology {} is Submited ", options.getTopologyName());
LOGGER.warn("===========================================================");
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
}
FieldGrouping 樣例分析
1)樣例1
2.將上面代碼打包上傳服務器執行命令
storm jar strom-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.sonly.storm.demo1.grouppings.fieldgrouping.FeildGroupingToplogy FieldGrouping2 FieldGrouping2 2 1 1 2 2
3.參數
topologyName='FieldGrouping2', prefix='FieldGrouping2', workers=2, spoutParallelismHint=1, spoutTaskSize=1, boltParallelismHint=2, boltTaskSize=
4.拓撲圖:
6.並發度以及組件分布圖:
7.根據分布情況檢查各個work的日志查看消息的發送情況
k8s-n3節點上:
[root@k8s-n3 6701]# grep "SheffleGroupingBolt1->execute" worker.log|wc -l
10
[root@k8s-n3 6701]# grep "FieldGroupingBolt->execute" worker.log|wc -l
2
k8s-n2節點:
[root@k8s-n2 6701]# grep "SheffleGroupingBolt1->execute" worker.log|wc -l
0
[root@k8s-n2 6701]# grep "FieldGroupingBolt->execute" worker.log|wc -l
8
再看一下詳情如何
k8s-n3:bolt1有10條消息應為bolt只有一個所以Fied分組是不會生效的。
[root@k8s-n3 6701]# grep "SheffleGroupingBolt1->execute" worker.log
2019-05-07 21:59:35.805 c.s.s.d.g.f.FieldGrouppingBolt1 Thread-7-bolt1-executor[6 6] [WARN] SheffleGroupingBolt1->execute:hashcode:107880849->ThreadId:41,TaskId:6,value:zhangsan
2019-05-07 21:59:35.810 c.s.s.d.g.f.FieldGrouppingBolt1 Thread-7-bolt1-executor[6 6] [WARN] SheffleGroupingBolt1->execute:hashcode:107880849->ThreadId:41,TaskId:6,value:zhangsan
2019-05-07 21:59:35.810 c.s.s.d.g.f.FieldGrouppingBolt1 Thread-7-bolt1-executor[6 6] [WARN] SheffleGroupingBolt1->execute:hashcode:107880849->ThreadId:41,TaskId:6,value:zhangsan
2019-05-07 21:59:35.811 c.s.s.d.g.f.FieldGrouppingBolt1 Thread-7-bolt1-executor[6 6] [WARN] SheffleGroupingBolt1->execute:hashcode:107880849->ThreadId:41,TaskId:6,value:zhangsan
2019-05-07 21:59:35.811 c.s.s.d.g.f.FieldGrouppingBolt1 Thread-7-bolt1-executor[6 6] [WARN] SheffleGroupingBolt1->execute:hashcode:107880849->ThreadId:41,TaskId:6,value:zhangsan
2019-05-07 21:59:35.812 c.s.s.d.g.f.FieldGrouppingBolt1 Thread-7-bolt1-executor[6 6] [WARN] SheffleGroupingBolt1->execute:hashcode:107880849->ThreadId:41,TaskId:6,value:zhangsan
2019-05-07 21:59:35.813 c.s.s.d.g.f.FieldGrouppingBolt1 Thread-7-bolt1-executor[6 6] [WARN] SheffleGroupingBolt1->execute:hashcode:107880849->ThreadId:41,TaskId:6,value:zhangsan
2019-05-07 21:59:35.814 c.s.s.d.g.f.FieldGrouppingBolt1 Thread-7-bolt1-executor[6 6] [WARN] SheffleGroupingBolt1->execute:hashcode:107880849->ThreadId:41,TaskId:6,value:zhangsan
2019-05-07 21:59:35.815 c.s.s.d.g.f.FieldGrouppingBolt1 Thread-7-bolt1-executor[6 6] [WARN] SheffleGroupingBolt1->execute:hashcode:107880849->ThreadId:41,TaskId:6,value:lisi
2019-05-07 21:59:35.838 c.s.s.d.g.f.FieldGrouppingBolt1 Thread-7-bolt1-executor[6 6] [WARN] SheffleGroupingBolt1->execute:hashcode:107880849->ThreadId:41,TaskId:6,value:lisi
k8s-n3:bolt 有兩個實例,按照field分組。里面有兩條消息。都是lisi
[root@k8s-n3 6701]# grep "FieldGroupingBolt->execute" worker.log
2019-05-07 21:59:35.855 c.s.s.d.g.f.FieldGroupingBolt Thread-11-bolt-executor[4 4] [WARN] FieldGroupingBolt->execute:hashcode:281792799->ThreadId:45,TaskId:4,value:lisi
2019-05-07 21:59:35.856 c.s.s.d.g.f.FieldGroupingBolt Thread-11-bolt-executor[4 4] [WARN] FieldGroupingBolt->execute:hashcode:281792799->ThreadId:45,TaskId:4,value:lisi
k8s-n2: bolt 應該就是8條消息,驗證一下
[root@k8s-n2 6701]# grep "FieldGroupingBolt->execute" worker.log
2019-05-07 21:59:48.315 c.s.s.d.g.f.FieldGroupingBolt Thread-11-bolt-executor[5 5] [WARN] FieldGroupingBolt->execute:hashcode:1858735164->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 21:59:48.317 c.s.s.d.g.f.FieldGroupingBolt Thread-11-bolt-executor[5 5] [WARN] FieldGroupingBolt->execute:hashcode:1858735164->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 21:59:48.317 c.s.s.d.g.f.FieldGroupingBolt Thread-11-bolt-executor[5 5] [WARN] FieldGroupingBolt->execute:hashcode:1858735164->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 21:59:48.318 c.s.s.d.g.f.FieldGroupingBolt Thread-11-bolt-executor[5 5] [WARN] FieldGroupingBolt->execute:hashcode:1858735164->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 21:59:48.318 c.s.s.d.g.f.FieldGroupingBolt Thread-11-bolt-executor[5 5] [WARN] FieldGroupingBolt->execute:hashcode:1858735164->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 21:59:48.319 c.s.s.d.g.f.FieldGroupingBolt Thread-11-bolt-executor[5 5] [WARN] FieldGroupingBolt->execute:hashcode:1858735164->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 21:59:48.319 c.s.s.d.g.f.FieldGroupingBolt Thread-11-bolt-executor[5 5] [WARN] FieldGroupingBolt->execute:hashcode:1858735164->ThreadId:45,TaskId:5,value:zhangsan
2019-05-07 21:59:48.320 c.s.s.d.g.f.FieldGroupingBolt Thread-11-bolt-executor[5 5] [WARN] FieldGroupingBolt->execute:hashcode:1858735164->ThreadId:45,TaskId:5,value:zhangsan
總結:
bolt->bolt節點時,feild分組會按照field字段的key值進行分組,key相同的會被分配到一個bolt里面。
如果執行
storm jar strom-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.sonly.storm.demo1.grouppings.fieldgrouping.FeildGroupingToplogy FieldGrouping3 FieldGrouping3 4 1 1 4 4
bolt 的分配情況是什么樣子?這個留給大家去思考一下。例子中按照field分組后只有兩種數據,但是這兩種數據要分配給4個bolt,那這個是怎么分配的?我將在下一篇博客里揭曉答案!
下一篇我會繼續分析這個分組策略,我會把我在學習這個storm的時候當時的自己思考的一個過程展現給大家,如果有什么錯誤的,或者沒有講清楚的地方,歡迎大家給我留言,咱們可以一起交流討論。