storm的DRPC模式的作用是實現從遠程調用storm集群的計算資源,而不需要連接到集群的某一個節點。OK。那么storm實現DRPC主要是使用LinearDRPCTopologyBuilder這個類。下面就先來看看一個簡單的例子,它的源碼的github上。
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class BasicDRPCTopology { public static class ExclaimBolt extends BaseBasicBolt { //主要需要覆寫execute方法和declareoutputfields方法 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); collector.emit(new Values(tuple.getValue(0), input + "!")); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); } } public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");//實現DRPC模式 builder.addBolt(new ExclaimBolt(), 3); Config conf = new Config(); if (args == null || args.length == 0) { LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); for (String word : new String[]{ "hello", "goodbye" }) { System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word)); } cluster.shutdown(); drpc.shutdown(); } else { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); } } }
這段代碼主要實現的功能是給接收到的每一個輸入后面添加一個感嘆號。ok,這樣就可以編譯提交了。
不過在這之前需要先配置storm集群的drpc server的ip。如圖。主要是下面的server的ip需要配置好。並且集群的每一個節點的配置文件都需要配置這項參數!
然后即可使用storm drpc &命令啟動drpc模式。(這里的分工是172.17.150.6為客戶端,其余的172.17.150.7(.8,.11)為集群的三個節點,.11是nimbus節點。)
OK,那接下來就使用客戶端向集群提交Topology。如圖。使用客戶端向集群提交名為exclaim的Topology。里面設置的worker數為3。
從下圖可以看到兩個supervisor分別有一個是運行兩個worker,有一個是運行一個worker。
ok,下面是客戶端調用遠程資源進行計算的程序。主要是聲明DRPCClient的ip以及端口,以及指定執行的方法名和傳入的參數(client.execute("exclamation",word))。
運行結果如下。
OK,整個DRPC的過程就是這樣。
謝謝大家!本人水平有限,請不吝指正!