一、配置開發環境
storm有兩種操作模式: 本地模式和遠程模式。使用本地模式的時候,你可以在你的本地機器上開發測試你的topology, 一切都在你的本地機器上模擬出來; 用遠程模式的時候你提交的topology會在一個集群的機器上執行。
建議使用maven,只需要加上storm的依賴就可以了。
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.0</version> <scope>provided</scope> </dependency>
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.ljh.storm</groupId> <artifactId>storm-helloworld</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>storm-helloworld</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.0</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>cn.ljh.storm.helloworld.ExclamationTopology</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
二、HelloWorld關聯代碼
ExclamationTopology.java
package cn.ljh.storm.helloworld; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; public class ExclamationTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word", new TestWordSpout(), 1); builder.setBolt("exclaim", new ExclamationBolt(), 1).shuffleGrouping("word"); builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("exclaim"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test3", conf, builder.createTopology()); Utils.sleep(20000); cluster.killTopology("test3"); cluster.shutdown(); } } }
TestWordSpout.java
package cn.ljh.storm.helloworld; import org.apache.storm.topology.OutputFieldsDeclarer; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestWordSpout extends BaseRichSpout { public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class); SpoutOutputCollector _collector; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
ExclamationBolt.java
package cn.ljh.storm.helloworld; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
PrintBolt.java
package cn.ljh.storm.helloworld; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PrintBolt extends BaseRichBolt { private static Logger LOG = LoggerFactory.getLogger(PrintBolt.class); OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { LOG.info(tuple.getString(0) + " Hello World!"); _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
三、實際運行
storm有本地模式和遠程模式。
1、本地模式
本地模式一般用於測試和開發階段,直接在Eclipse執行ExclamationTopology的main函數進行。
本地模式的代碼中有設置睡眠時間,到時間后主動kill topoloyg。
Utils.sleep(20000);
開始設置的時間是10S,運行log中沒有期待的輸出,反而出現以下錯誤。
org.apache.storm.shade.org.apache.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid
0x15c8a2872ac000f, likely client has closed socket
at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) [storm-core-1.1.0.jar:1.1.0]
at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-1.1.0.jar:1.1.0]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
后面設置時間為20S,運行log中也有上面錯誤,但是有期待的輸出。
原因是機器比較慢,還沒初始化完就到時間跳出了,所以把睡眠時間設置大些。
2、遠程模式
集群模式需要先創建一個包含程序代碼以及代碼所依賴的依賴包的jar包(有關storm的jar包不用包括, 這些jar包會在工作節點上自動被添加到classpath里面去)。如果使用maven, 那么插件:Maven Assembly Plugin可以幫你打包,只要把下面的配置加入pom.xml。
<plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>cn.ljh.storm.helloworld.ExclamationTopology</mainClass> </manifest> </archive> </configuration> </plugin>
然后運行mvn assembly:assembly就可以打包了.
(1)用storm提交topology
storm jar storm-helloworld-0.0.1-SNAPSHOT-jar-with-dependencies.jar cn.ljh.storm.helloworld.ExclamationTopology ExclamationTest
運行提交命令后,出現如下log,說明提交成功。
查看集群的進程jps,兩個Supervisor節點出現了worker進程
在Nimbus節點的/usr/local/storm/data/nimbus/inbox下面有提交的jar
UI界面顯示提交topology
(2)終止一個topology
要終止一個topology, 執行:
storm kill {stormname}
其中{stormname}是提交topology給storm集群的時候指定的名字。
storm不會馬上終止topology。相反,它會先終止所有的spout,讓它們不再發射任何新的tuple, storm會等Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS秒之后才殺掉所有的工作進程。這會給topology足夠的時 間來完成所有我們執行storm kill命令的時候還沒完成的tuple。
(3)更新一個運行中的topology
為了更新一個正在運行的topology, 唯一的選擇是殺掉正在運行的topology然后重新提交一個新的。
至此HelloWorld示例完成。
四、常見配置
有很多topology級的配置可以設。 以”TOPOLOGY”打頭的配置是topology級別的配置,可以覆蓋全局級別的配置。下面是一些比較常見的:
1)Config.TOPOLOGY_WORKER設置: 這個設置用多少個工作進程來執行這個topology。比如,如果你把它設置成25, 那么集群里面一共會有25個java進程來執行這個topology的所有task。如果你的這個topology里面所有組件加起來一共有150的並行 度,那么每個進程里面會有6個線程(150 / 25 = 6)。
2)Config.TOPOLOGY_ACKERS: 這個配置設置acker線程的數目。Ackers是Storm的可靠性API的一部分。
3)Config.TOPOLOGY_MAX_SPOUT_PENDING: 這個設置一個spout task上面最多有多少個沒有處理的tuple(沒有ack/failed)回復, 我們推薦你設置這個配置,以防止tuple隊列爆掉。
4)Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS: 這個配置storm的tuple的超時時間 – 超過這個時間的tuple被認為處理失敗了。這個設置的默認設置是30秒,對於大多數的topology都已經足夠了。
5)Config.TOPOLOGY_SERIALIZATIONS: 為了在你的tuple里面使用自定義類型,你可以用這個配置注冊自定義serializer。









