在上一篇博客中,我們通過Storm.Net.Adapter創建了一個使用Csharp編寫的Storm Topology - wordcount。本文將介紹如何編寫Java端的程序以及如何發布到測試的Storm環境中運行。
如果你覺得對你有幫助,歡迎Star和Fork,讓更多人看到來幫助完善這個項目。
STEP1: 克隆storm官方示例項目 storm-starter:
$ git clone git://github.com/apache/storm.git && cd storm/examples/storm-starter
STEP2: 增加csharp的多語言支持:
將上一篇博客 使用Csharp創建你的第一個Storm拓撲 中完成的項目編譯,把生產的組件拷貝到 /multilang/resources/
文件夾中。
STEP3:使用JAVA創建Topology:
在 /src/jvm/storm/starter/
新增 WordCountTopologyCsharp.java
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.ShellSpout; import backtype.storm.task.ShellBolt; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import java.util.Map; /** * This topology demonstrates Storm's stream groupings and multilang capabilities. */ public class WordCountTopologyCsharp { public static class Generator extends ShellSpout implements IRichSpout { public Generator() { super("cmd", "/k", "CALL", "StormSimple.exe", "generator"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } public static class Splitter extends ShellBolt implements IRichBolt { public Splitter() { super("cmd", "/k", "CALL", "StormSimple.exe", "splitter"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } public static class Counter extends ShellBolt implements IRichBolt { public Counter(){ super("cmd", "/k", "CALL", "StormSimple.exe", "counter"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } @Override public Map<String, Object> getComponentConfiguration() { return null;