開始JStorm學習之前需要搭建集群環境,這里演示搭建單機JStorm環境,僅供學習使用,生產環境部署大同小異,但建議參考JStorm社區及相關說明文檔。
一、前提
JStorm核心代碼均用Java實現,所以依賴Java Runtime,另外,JStorm有腳本采用Python實現,所以還需要Python的支持。
1、JAVA環境
2、Python環境
這里選擇Java版本1.6.0_35及Python版本2.6.5,如果默認沒有安裝可以參考相關文檔(www.java.com和www.python.org)。
二、版本選擇
zeromq-3.2.4
zookeeper-3.4.5
jstorm-0.7.1
三、JStorm環境搭建
與Storm一樣,JStorm的底層消息通信機制依賴zeromq/jzmq,另外,JStorm通過zookeeper實現數據共享和協調服務。
1、安裝zeromq
wget http://download.zeromq.org/zeromq-3.2.4.tar.gz
tar zxf zeromq-3.2.4.tar.gz
cd zeromq-3.2.4
./configure
make
sudo make install
sudo ldconfig
2、安裝jzmq
wget https://github.com/zeromq/jzmq/tarball/master -O jzmq.tar.gz
tar zxf jzmq.tar.gz
cd jzmq
./autogen.sh
./configure
make
make install
3、安裝zookeeper
wget http://apache.dataguru.cn/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz
tar zxf zookeeper-3.4.5.tar.gz
cd zookeeper-3.4.5
./bin/zkServer.sh start
./bin/zkServer.sh stop
4、安裝jstorm
wget http://42.121.19.155/jstorm/jstorm-0.7.1.zip
unzip jstorm-0.7.1.zip
編輯配置文件conf/storm.yaml
storm.zookeeper.servers:
- “localhost”
nimbus.host: “localhost”
storm.zookeeper.root: “/jstorm”
storm.local.dir: “/tmp/jstorm”
drpc.servers:
- “localhost”
如果是開發環境本地內存不足情況時啟動nimbus可能會拋出異常:
Error occurred during initialization of VM
Could not reserve enough space for object heap
只需要在conf/storm.yaml里配置:
nimbus.childopts: “-Xmx256m”
supervisor.childopts: “-Xmx256m”
worker.childopts: “-Xmx128m”
其中大小可根據實際情況配置
5、UI
前提:tomcat 7.0 或以上版本;
將jstorm-ui-0.7.1.war復制到tomcat的webapps目錄下;
6、啟動JStorm
啟動zookeeper:進入zookeeper目錄,執行bin/zkServer.sh start
啟動Nimbus:進入JStorm目錄,執行bin/jstorm nimbus
啟動Supervisor:進入JStorm目錄,執行bin/jstorm supervisor
啟動Tomcat:進入Tomcat目錄,執行bin/startup.sh
四、JStorm HelloWorld
1、編寫源碼
這個例子取自:github
HelloWorldTopology.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
package storm.cookbook; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; /** * Author: ashrith * Date: 8/26/13 * Time: 12:03 PM * Desc: setup the topology and submit it to either a local of remote Storm cluster depending on the arguments * passed to the main method. */ public class HelloWorldTopology { /* * main class in which to define the topology and a LocalCluster object (enables you to test and debug the * topology locally). In conjunction with the Config object, LocalCluster allows you to try out different * cluster configurations. * * Create a topology using 'TopologyBuilder' (which will tell storm how the nodes area arranged and how they * exchange data) * The spout and the bolts are connected using 'ShuffleGroupings' * * Create a 'Config' object containing the topology configuration, which is merged with the cluster configuration * at runtime and sent to all nodes with the prepare method * * Create and run the topology using 'createTopology' and 'submitTopology' */ public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10); builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 1).shuffleGrouping("randomHelloWorld"); Config conf = new Config(); conf.put(Config.NIMBUS_HOST, "localhost"); conf.put(Config.NIMBUS_THRIFT_PORT, 6627); conf.setDebug(true); if(args!=null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { } } } |
HelloWorldSpout.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
package storm.cookbook; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import java.util.Map; import java.util.Random; /** * Author: ashrith * Date: 8/21/13 * Time: 8:33 PM * Desc: spout essentially emits a stream containing 1 of 2 sentences 'Other Random Word' or 'Hello World' based on * random probability. It works by generating a random number upon construction and then generating subsequent * random numbers to test against the original member variable's value. When it matches "Hello World" is emitted, * during the remaining executions the other sentence is emitted. */ public class HelloWorldSpout extends BaseRichSpout{ private SpoutOutputCollector collector; private int referenceRandom; private static final int MAX_RANDOM = 10; public HelloWorldSpout() { final Random rand = new Random(); referenceRandom = rand.nextInt(MAX_RANDOM); } /* * declareOutputFields() => you need to tell the Storm cluster which fields this Spout emits within the * declareOutputFields method. */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } /* * open() => The first method called in any spout is 'open' * TopologyContext => contains all our topology data * SpoutOutputCollector => enables us to emit the data that will be processed by the bolts * conf => created in the topology definition */ @Override public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) { this.collector = collector; } /* * nextTuple() => Storm cluster will repeatedly call the nextTuple method which will do all the work of the spout. * nextTuple() must release the control of the thread when there is no work to do so that the other methods have * a chance to be called. */ @Override public void nextTuple() { final Random rand = new Random(); int instanceRandom = rand.nextInt(MAX_RANDOM); if(instanceRandom == referenceRandom){ collector.emit(new Values("Hello World")); } else { collector.emit(new Values("Other Random Word")); } } } |
HelloWorldBolt.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
package storm.cookbook; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import java.util.Map; /** * Author: ashrith * Date: 8/26/13 * Time: 11:48 AM * Desc: This bolt will consume the produced Tuples from HelloWorldSpout and implement the required counting logic */ public class HelloWorldBolt extends BaseRichBolt { private int myCount = 0; /* * prepare() => on create */ @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { } /* * execute() => most important method in the bolt is execute(Tuple input), which is called once per tuple received * the bolt may emit several tuples for each tuple received */ @Override public void execute(Tuple tuple) { String test = tuple.getStringByField("sentence"); if(test == "Hello World"){ myCount++; System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount)); } } /* * declareOutputFields => This bolt emits nothing hence no body for declareOutputFields() */ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } } |
2、提交Topology
上述源碼編譯打包Helloworld.jar后提交到jstorm集群:
bin/jstorm jar Helloworld.jar storm.cookbook.HelloWorldTopology HelloWorld
其中參數[HelloWorld]為TopologyName
3.查看Topology運行狀況
通過ui等途徑可以查看Topology的執行情況。
五、結語
本節簡單介紹了JStorm單機環境的搭建,用供初學者搭建單機JStorm,並能夠編寫HelloWolrd,生產環境集群搭建僅做參考,詳細配置建議查詢相關文檔。
六、參考文檔
[1]https://github.com/alibaba/jstorm/wiki
[2]https://github.com/nathanmarz/storm/wiki
http://hexiaoqiao.sinaapp.com/2014/06/09/jstorm%E7%8E%AF%E5%A2%83%E6%90%AD%E5%BB%BA/