開發Storm的第一步就是設計Topology,為了方便開發者入門,首先我們設計一個簡答的例子,該例子的主要的功能就是把每個單詞的后面加上Hello,World后綴,然后再打印輸出,整個例子的Topology圖如下:
整個Topology分為三部分:
TestWordSpout:數據源,負責發送words
ExclamationBolt:負責把每個單詞后面加上后綴
PrintBolt:負責把單詞打印輸出
代碼實現:
1.使用IDEA創建maven過程,添加Maven依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ysl</groupId> <artifactId>storm</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/storm/storm --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.ysl.WordsTopology</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
TestWordSpout:
package com.ysl.spouts; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; import java.util.Random; public class TestWordSpout extends BaseRichSpout{ private static Logger logger = LoggerFactory.getLogger(TestWordSpout.class); private SpoutOutputCollector collector = null; public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; } public void nextTuple() { Utils.sleep(1000); final String[] words = new String[]{"fdfs","fdfs","ffsdfs"}; final Random random = new Random(); final String word = words[random.nextInt(words.length)]; collector.emit(new Values(word)); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } }
ExclamationBolt:
package com.ysl.bolts; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class ExclamationBolt extends BaseRichBolt{ private static Logger logger = LoggerFactory.getLogger(ExclamationBolt.class); private OutputCollector collector = null; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } public void execute(Tuple tuple) { this.collector.emit(tuple,new Values(tuple.getString(0)+"!!!")); this.collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } }
PrintBolt:
package com.ysl.bolts; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class PrintBolt extends BaseRichBolt{ private static Logger logger = LoggerFactory.getLogger(PrintBolt.class); private OutputCollector collector = null; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } public void execute(Tuple tuple) { logger.info(tuple.getString(0) + "......."); this.collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
WordsTopology:
package com.ysl; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; import com.ysl.bolts.ExclamationBolt; import com.ysl.bolts.PrintBolt; import com.ysl.spouts.TestWordSpout; public class WordsTopology { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("word",new TestWordSpout(),1); topologyBuilder.setBolt("exclaim",new ExclamationBolt(),1).shuffleGrouping("word"); topologyBuilder.setBolt("print",new PrintBolt(),1).shuffleGrouping("exclaim"); Config config = new Config(); config.setDebug(true); if(args != null && args.length > 0){ config.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0],config,topologyBuilder.createTopology()); }else{ LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("test",config,topologyBuilder.createTopology()); Utils.sleep(30000); localCluster.killTopology("test"); localCluster.shutdown(); } } }
2.打包運行
使用maven打包應用程序,命令如下:
mvn clean install
storm的運行方式有兩種:一是本地運行,適合調試和開發,自己直接在IDEA中執行main函數運行即可,本地模式的代碼中有設置睡眠時間,到時間后主動kill topoloyg
二是遠程集群模式運行:集群模式需要先創建一個包含程序代碼以及代碼所依賴的依賴包的jar包(有關storm的jar包不用包括, 這些jar包會在工作節點上自動被添加到classpath里面去)。如果使用maven, 那么插件:Maven Assembly Plugin可以幫你打包,詳細見上述maven的設置。
遠程運行要使用storm的命令提交topology到storm集群:
storm jar /home/workspace/storm/target/storm-1.0-SNAPSHOT.jar com.ysl.WordsTopology testfrfr
執行上面的命令后,出現下面的日志,表示執行成功:
346 [main] INFO backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar... 351 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar /home/workspace/storm/target/storm-1.0-SNAPSHOT.jar to assigned location: storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar Start uploading file '/home/workspace/storm/target/storm-1.0-SNAPSHOT.jar' to 'storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar' (6196 bytes) [==================================================] 6196 / 6196 File '/home/workspace/storm/target/storm-1.0-SNAPSHOT.jar' uploaded to 'storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar' (6196 bytes) 363 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar 363 [main] INFO backtype.storm.StormSubmitter - Submitting topology testfrfr in distributed mode with conf {"topology.workers":3,"topology.debug":true} 448 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology:
終止一個topology
要終止一個topology, 執行:
storm kill {stormname}
其中{stormname}是提交topology給storm集群的時候指定的名字。
storm不會馬上終止topology。相反,它會先終止所有的spout,讓它們不再發射任何新的tuple, storm會等Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS秒之后才殺掉所有的工作進程。這會給topology足夠的時 間來完成所有我們執行storm kill命令的時候還沒完成的tuple。