在Storm的Toplogy中設置多數據源Spout


上代碼:主要看main方法中的設置.   如下代碼是一般情況下的設置方法...Trident中設置多數據源看對應的博客總結

  1 /**
  2  * 指定多個數據源
  3  * 數字累加求和
  4  * 先添加storm依賴
  5  */
  6 public class LocalTopologyMeger {
  7     /**
  8      * spout需要繼承baserichspout,實現未實現的方法
  9      * @author Administrator
 10      *
 11      */
 12     public static class MySpout extends BaseRichSpout{
 13         private Map conf;
 14         private TopologyContext context;
 15         private SpoutOutputCollector collector;
 16         
 17         /**
 18          * 初始化方法,只會執行一次
 19          * 在這里面可以寫一個初始化的代碼
 20          * Map conf:其實里面保存的是topology的一些配置信息
 21          * TopologyContext context:topology的上下文,類似於servletcontext
 22          * SpoutOutputCollector collector:發射器,負責向外發射數據(tuple)
 23          */
 24         @Override
 25         public void open(Map conf, TopologyContext context,
 26                 SpoutOutputCollector collector) {
 27             this.conf = conf;
 28             this.context = context;
 29             this.collector = collector;
 30         }
 31 
 32         int num = 1;
 33         /**
 34          * 這個方法是spout中最重要的方法,
 35          * 這個方法會被storm框架循環調用,可以理解為這個方法是在一個while循環之內
 36          * 每調用一次,會向外發射一條數據
 37          */
 38         @Override
 39         public void nextTuple() {
 40             System.out.println("spout發射:"+num);
 41             //把數據封裝到values中,稱為一個tuple,發射出去
 42             this.collector.emit(new Values(num++));
 43             Utils.sleep(1000);
 44         }
 45         
 46         /**
 47          * 聲明輸出字段
 48          */
 49         @Override
 50         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 51             //給values中的數據起個名字,方便后面的bolt從這個values中取數據
 52             //fields中定義的參數和values中傳遞的數值是一一對應的
 53             declarer.declare(new Fields("num"));
 54         }
 55         
 56     }
 57     
 58     
 59     /**
 60      * 自定義bolt需要實現baserichbolt
 61      * @author Administrator
 62      *
 63      */
 64     public static class MyBolt extends BaseRichBolt{
 65         private Map stormConf; 
 66         private TopologyContext context;
 67         private OutputCollector collector;
 68         
 69         /**
 70          * 和spout中的open方法意義一樣
 71          */
 72         @Override
 73         public void prepare(Map stormConf, TopologyContext context,
 74                 OutputCollector collector) {
 75             this.stormConf = stormConf;
 76             this.context = context;
 77             this.collector = collector;
 78         }
 79 
 80         int sum = 0;
 81         /**
 82          * 是bolt中最重要的方法,當spout發射一個tuple出來,execute也會被調用,需要對spout發射出來的tuple進行處理
 83          */
 84         @Override
 85         public void execute(Tuple input) {
 86             //input.getInteger(0);//也可以根據角標獲取tuple中的數據
 87             Integer value = input.getIntegerByField("num");
 88             sum+=value;
 89             System.out.println("和:"+sum);
 90         }
 91         
 92         /**
 93          * 聲明輸出字段
 94          */
 95         @Override
 96         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 97             //在這沒必要定義了,因為execute方法中沒有向外發射tuple,所以就不需要聲明了。
 98             //如果nextTuple或者execute方法中向外發射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明
 99         }
100         
101     }
102     /**
103      * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統保留的
104      * @param args
105      */
106     public static void main(String[] args) {
107         //組裝topology
108         TopologyBuilder topologyBuilder = new TopologyBuilder();
109         topologyBuilder.setSpout("spout1", new MySpout());
110         topologyBuilder.setSpout("spout2", new MySpout());
111         //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發射出來的tuple
112         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1").shuffleGrouping("spout2");
113         
114         //創建本地storm集群
115         LocalCluster localCluster = new LocalCluster();
116         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
117     }
118 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM