以單詞分割計數為例實現Storm的DirectGroup分組:
1、Spout實現
Spout是Storm數據源頭,使用DirectGroup方式將Spout數據發送指定的Bolt,需注意:
1)、Spout消費的Bolt的Task(Task應為Storm的Executor的編號),在如下代碼中Spout.open()初始化中拿到消費Task
2)、需使用SpoutOutputCollector.emitDirect()方法
3)、將Spout聲明為直接流,即在Spout.declareOutputFields()聲明
/**
* Fixed Cycle Spout
*
* @author hanhan.zhang
* */
public class FixedCycleSpout implements IRichSpout {
private String _fieldName;
private boolean _direct;
// stream mark
private String _streamId;
private int _index;
// key = msgId, value = sending tuple
private Map<String, List<Object>> _pendingTuple;
// send tuple
private List<Object> [] _sendTuple;
private SpoutOutputCollector _collector;
private CountMetric _sendMetric;
private CountMetric _failMetric;
// consume task set
private List<Integer> _consumeTaskIdList;
public FixedCycleSpout(String _streamId, String _fieldName, boolean _direct, List<Object> ... _sendTuple) {
this._streamId = _streamId;
this._fieldName = _fieldName;
this._direct = _direct;
this._sendTuple = _sendTuple;
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this._index = 0;
_pendingTuple = Maps.newHashMap();
// register metric
this._sendMetric = context.registerMetric("cycle.spout.send.tuple.metric", new CountMetric(), 60);
this._failMetric = context.registerMetric("cycle.spout.fail.tuple.metric", new CountMetric(), 60);
this._collector = collector;
// get consume task id
if (this._direct) {
this._consumeTaskIdList = Lists.newLinkedList();
Map<String, Map<String, Grouping>> consumeTargets = context.getThisTargets();
if (consumeTargets != null && !consumeTargets.isEmpty()) {
// streamId = this._streamId
consumeTargets.forEach((streamId, target) -> {
if (target != null && !target.isEmpty()) {
// componentId = consume target component Id
target.forEach((componentId, group) -> {
if (group.is_set_direct()) {
this._consumeTaskIdList.addAll(context.getComponentTasks(componentId));
}
});
}
});
}
}
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void nextTuple() {
this._sendMetric.incr();
if (this._index == _sendTuple.length) {
this._index = 0;
}
String msgId = UUID.randomUUID().toString();
List<Object> tuple = this._sendTuple[this._index++];
sendTuple(msgId, tuple);
}
@Override
public void ack(Object msgId) {
String msgIdStr = (String) msgId;
System.out.println("ack tuple with msgId " + msgIdStr);
this._pendingTuple.remove(msgIdStr);
}
@Override
public void fail(Object msgId) {
this._failMetric.incr();
String msgIdStr = (String) msgId;
System.out.println("fail tuple with msgId " + msgIdStr);
sendTuple(msgIdStr, this._pendingTuple.get(msgIdStr));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(this._streamId, this._direct, new Fields(_fieldName));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
protected void sendTuple(String msgId, List<Object> tuple) {
this._pendingTuple.put(msgId, tuple);
if (this._direct) {
if (this._consumeTaskIdList == null || this._consumeTaskIdList.isEmpty()) {
throw new IllegalStateException("direct task is empty !");
}
this._consumeTaskIdList.forEach(taskId ->
this._collector.emitDirect(taskId, this._streamId, tuple, msgId));
} else {
this._collector.emit(tuple, msgId);
}
}
}
2、Bolt實現
/** * Sentence Split Bolt * * @author hanhan.zhang * */ public class SentenceSplitBolt implements IRichBolt { private OutputCollector _collector; private CountMetric _ackMetric; private CountMetric _failMetric; private String _separator; private int _taskId; private boolean _direct; private String _streamId; public SentenceSplitBolt(String _streamId, boolean _direct) { this._streamId = _streamId; this._direct = _direct; } /** * @param context * 1: Register Metric * 2: Next Bolt Message * @param collector (thread-safe) * 1: Emit Tuple * 2: Ack/Fail Tuple * */ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this._collector = collector; // register metric for monitor this._ackMetric = context.registerMetric("sentence.split.ack.metric", new CountMetric(), 60); this._failMetric = context.registerMetric("sentence.split.fail.metric", new CountMetric(), 60); this._taskId = context.getThisTaskId(); this._separator = (String) stormConf.get(Const.SEPARATOR); } @Override public void execute(Tuple input) { try { String sentence = input.getString(0); if (Strings.isNullOrEmpty(sentence)) { return; } String []fields = sentence.split(_separator); for (String field : fields) { if (this._direct) { this._collector.emitDirect(this._taskId, _streamId, input, new Values(field, 1)); } else { this._collector.emit(this._streamId, input, new Values(field, 1)); } } this._collector.ack(input); this._ackMetric.incr(); } catch (Exception e) { this._collector.fail(input); this._failMetric.incr(); } } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(this._streamId, this._direct, new Fields("word", "count")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } /** * Word Sum Bolt * * @author hanhan.zhang * */ public class WordSumBolt extends BaseRichBolt { private OutputCollector _collector; private int _taskId; private Cache<String, AtomicInteger> _wordCache; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this._collector = collector; this._taskId = context.getThisTaskId(); this._wordCache = CacheBuilder.newBuilder() .maximumSize(1024) .expireAfterWrite(3, TimeUnit.SECONDS) .removalListener((removalNotification) -> { String key = (String) removalNotification.getKey(); AtomicInteger sum = (AtomicInteger) removalNotification.getValue(); System.out.println("word sum result : [" + key + "," + sum.get() + "]"); }) .build(); } @Override public void execute(Tuple input) { try { String word = input.getString(0); int count = input.getInteger(1); if (Strings.isEmpty(word)) { return; } AtomicInteger counter = this._wordCache.getIfPresent(word); if (counter == null) { this._wordCache.put(word, new AtomicInteger(count)); } else { counter.addAndGet(count); } this._collector.ack(input); } catch (Exception e) { this._collector.fail(input); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
3、Storm運行
/** * Tuple Split-Flow Topology * * @author hanhan.zhang * */ public class FlowTopology { public static void main(String[] args) { // send tuple List<Object> []tuple = new List[] {new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat")}; //stream name String spoutStreamId = "topology.flow.cycle.spout.stream"; String splitStreamId = "topology.flow.split.bolt.stream"; // spout FixedCycleSpout cycleSpout = new FixedCycleSpout(spoutStreamId, "sentence", true, tuple); // bolt SentenceSplitBolt splitBolt = new SentenceSplitBolt(splitStreamId, false); WordSumBolt sumBolt = new WordSumBolt(); TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout ("sentence.cycle.spout", cycleSpout, 1); topologyBuilder.setBolt("sentence.split.bolt", splitBolt, 1) .directGrouping("sentence.cycle.spout", spoutStreamId); topologyBuilder.setBolt("word.sum.bolt", sumBolt, 3) .fieldsGrouping("sentence.split.bolt", splitStreamId, new Fields("word")); Config config = new Config(); config.put(Const.SEPARATOR, " "); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("flowTopology", config, topologyBuilder.createTopology()); } }
