storm trident 的介紹與使用


一.trident 的介紹

  trident 的英文意思是三叉戟,在這里我的理解是因為之前我們通過之前的學習topology spout bolt 去處理數據是沒有問題的,但trident 的對spout bolt 更高層次的一個抽象,其實現功能是一樣的,只不過是trident做了更多的優化和封裝.如果對處理的性能要求比較高,建議要采用spout bolt 來處理,反之則可以用trident

  trident 你可以這樣理解,本身這個拓撲就是分散的,如果一個spout 可以有2個bolt,跟三叉戟比較像。(個人理解)

  因為trident是對storm 更高一層的抽象,它與之前學的spout bolt處理數據流的方式不一樣,trident 是以batch(一組tuples)為單位進行處理的。

二.trident API操作

  trident采用批處理的方式來處理數據,其API的操作是對數據處理的方式改成了函數。對數據處理的操作有:filter sum aggregator等

  function函數的操作都是對流中的tuple進行操作的

  下面介紹 trident 常用的API

  1.each(Fields inputFields, Filter filter
      作用:操作batch中的每一個tuple內容,一般與Filter或者Function函數配合使用。 

  2.peek(Consumer action
    作用:不作任務操作,傳的參數是consumer,類似於System.out.println

  3.partitionBy(Fields fields)
    作用:將tuples中的數據按設置的字段重定向到下一處理邏輯,設置相同字段的tuple一定會被分配到同一個線程中處理。

 三.trident 的常用函數

  1.FilterFunction 過濾  
    作用:對一組batch 中的tuple數據過濾     實現過程是:自定義類實現BaseFilter接口,重寫isKeep()方法,在each()方法中使用自定義的類即可   
2.SumFunction 求和     作用:對流中的數據進行加減     實現過程:自定義類實現BaseFunction接口,重寫execute方法,在each()方法中使用
  3
.MapFunction (一對一函數)     作用: 對一個tuple進行自定義操作     實現過程:自定義類實現MapFunction接口,重寫execute()方法,通過map()方法使用
  
4.ProjectionFunction (投影函數)     作用:投影函數,只保留stream中指定字段的數據。     實現過程:在project()方法中定義所需字段即可     例:有一個Stream包含如下字段: ["x","y","z"],使用投影: project(new Fields("y", "z")) 則輸出的流僅包含 ["y","z"]字段   5.repatition(重定向)     
    作用:重定向是指tuple通過下面哪種方式路由到下一層     shuffle:  通過隨機分配算法來均衡tuple到各個分區     broadcast: 每個tuple都被廣播到所有的分區,這種方式在drcp時非常有用,比如在每個分區上做stateQuery     partitionBy:根據指定的字段列表進行划分,具體做法是用指定字段列表的hash值對分區個數做取模運算,確保相同字段列表的數據被划分到同一個分區     global:   所有的tuple都被發送到這個分區上,這個分區用來處理整個Stream的tuple數據的,但這個線程是獨立起的     batchGlobal:一個batch中的tuple都被發送到同一個分區,不同的batch會去往不同的分區     partition: 通過一個自定義的分區函數來進行分區,這個自定義函數實現了 backtype.storm.grouping.CustomStreamGrouping   

  6.Aggregation(聚合)     
    在storm的trident中處理數據是以批的形式進行處理的,所以在聚合時也是對批量內的數據進行的。經過aggregation的tuple,是被改變了原有的數據狀態     在Aggregator接口中有3個方法需要實現       init()   : 當batch接收到數據時執行。並對tuple中的數據進行初始化       aggregate(): 在接收到batch中的每一個tuple時執行,該方法一個重定向方法,它會隨機啟動一個單獨的線程來進行聚合操作       complete() : 在一個batch的結束時執行

    6.1 partitionAggregator

      它是對當前partition上的各個batch執行聚合操作,它不是一個重定向操作,即統計batch上的tuple的操作
   6.2 aggregator

      對一批batch中的tuple數據進行聚合
     
   
6.3 ReduceAggregator
      對一批batch中第n個元素的操作 

    6.4 CombinerAggregate

      對一批batch中的tuple進行聚合操作,它是一個重定向操作

    6.5 PersistenceAggrgator

      持久化聚合器,在聚合之前先將數據存到一個位置,然后再對數據進行聚合操作

    6.6 AggregateChina

     聚合鏈,對一批batch 中的tuple進行多條件聚合操作

   7.GroupBy

      GroupBy會根據指定字段,把整個stream切分成一個個grouped stream,如果在grouped stream上做聚合操作,那么聚合就會發生在這些grouped stream上而不是整個batch。      如果groupBy后面跟的是aggregator,則是聚合操作,如果跟的是partitionAggregate,則不是聚合操作。 

     

四.trident常用函數示例

  1.FilterFunction

    需求:在 一組數據中,過濾出第1個值與第2個值相加的值是偶數的

public class FilterTrident {
    private static final Logger LOG = LoggerFactory.getLogger(FilterTrident.class);

    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws InterruptedException {
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("a","b","c","d"), 3, 
                                new Values(1,4,7,10),
                                new Values(1,1,3,11),
                                new Values(2,2,7,1), 
                                new Values(2,5,7,2));
        spout.setCycle(false);
        Config conf = new Config();
        conf.setNumWorkers(4);
        conf.setDebug(false);
        
        TridentTopology topology = new TridentTopology();
        // peek: 不做任務操作,因為參數的consumer
        // each:spout中的指定元素進行操作
        topology.newStream("filter", spout).parallelismHint(1)
                .localOrShuffle()
                .peek(input -> LOG.info("peek1 ================{},{},{},{}",input.get(0),input.get(1),input.get(2),input.get(3)))
                .parallelismHint(2)
                .localOrShuffle()
                .each(new Fields("a","b"),new CheckEvenSumFilter())
                .parallelismHint(2)
                .localOrShuffle()
                .peek(input -> LOG.info("peek2 +++++++++++++++++++{},{},{},{}",
                    input.getIntegerByField("a"),input.getIntegerByField("b"),
                    input.getIntegerByField("c"),input.getIntegerByField("d"))
                ).parallelismHint(1);
                
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("FilterTrident", conf, topology.build());
        
        LOG.warn("==================================================");
        LOG.warn("the LocalCluster topology {} is submitted.","FilterTrident");
        LOG.warn("==================================================");
        
        TimeUnit.SECONDS.sleep(30);
        cluster.killTopology("FilterTrident");
        cluster.shutdown();
    }
    
    private static class CheckEvenSumFilter extends BaseFilter{

        @Override
        public boolean isKeep(TridentTuple tuple) {
            Integer a = tuple.getIntegerByField("a");
            Integer b = tuple.getIntegerByField("b");
            
            return (a + b) % 2 == 0;
        }
    }
}

   2.SumFunction

    需求:對一組數據中的前2個數求各

public class SumFunctionTrident {

    private static final Logger LOG = LoggerFactory.getLogger(SumFunctionTrident.class);

    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws InterruptedException {
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("a","b","c","d"), 3, 
                                new Values(1,4,7,10),
                                new Values(1,1,3,11),
                                new Values(2,2,7,1), 
                                new Values(2,5,7,2));
        spout.setCycle(false);
        Config conf = new Config();
        conf.setNumWorkers(4);
        conf.setDebug(false);
        
        TridentTopology topology = new TridentTopology();
        // peek: 不做任務操作,因為參數的consumer
        // each:spout中的指定元素進行操作
        topology.newStream("function", spout).parallelismHint(1)
                .localOrShuffle()
                .peek(input -> LOG.info("peek1 ================{},{},{},{}",input.get(0),input.get(1),input.get(2),input.get(3)))
                .parallelismHint(2)
                .localOrShuffle()
                .each(new Fields("a","b"),new SumFunction(),new Fields("sum"))
                .parallelismHint(2)
                .localOrShuffle()
                .peek(input -> LOG.info("peek2 ================{},{},{},{},{}",
                            input.getIntegerByField("a"),input.getIntegerByField("b"),input.getIntegerByField("c"),input.getIntegerByField("d"),input.getIntegerByField("sum")))
                .parallelismHint(1);
                
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("SumFunctionTrident", conf, topology.build());
        
        LOG.warn("==================================================");
        LOG.warn("the LocalCluster topology {} is submitted.","SumFunctionTrident");
        LOG.warn("==================================================");
        
        TimeUnit.SECONDS.sleep(30);
        cluster.killTopology("HelloTridentTopology");
        cluster.shutdown();
    }
    
    private static class SumFunction extends BaseFunction{

        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            Integer a = tuple.getIntegerByField("a");
            Integer b = tuple.getIntegerByField("b");
            collector.emit(new Values(a+b));
        }
    }
}

3.MapFunction

  需求:對一組batch中的tuple進行大小寫轉換

public class MapFunctionTrident {
    
    private static final Logger LOG = LoggerFactory.getLogger(MapFunctionTrident.class);
        
    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        
        boolean isRemoteMode = false;
        if(args.length > 0){
            isRemoteMode = true;
        }
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("line"),3, 
                                    new Values("hello stream"),
                                    new Values("hello kafka"),
                                    new Values("hello hadoop"),
                                    new Values("hello scala"),
                                    new Values("hello java")
                                    );
        spout.setCycle(true);
        TridentTopology topology = new TridentTopology();
        Config conf = new Config();
        conf.setNumWorkers(4);
        conf.setDebug(false);
        
        topology.newStream("hello", spout).parallelismHint(1)
                .localOrShuffle()
                .map(new MyMapFunction(),new Fields("upper"))
                .parallelismHint(2)
                .partition(Grouping.fields(ImmutableList.of("upper")))
                .peek(input ->LOG.warn("================>> peek process value:{}",input.getStringByField("upper")))
                .parallelismHint(3);
        if(isRemoteMode){
            StormSubmitter.submitTopology("HelloTridentTopology", conf, topology.build());
            LOG.warn("==================================================");
            LOG.warn("the remote topology {} is submitted.","HelloTridentTopology");
            LOG.warn("==================================================");
        }else{
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("HelloTridentTopology", conf, topology.build());
            
            LOG.warn("==================================================");
            LOG.warn("the LocalCluster topology {} is submitted.","HelloTridentTopology");
            LOG.warn("==================================================");
            
            TimeUnit.SECONDS.sleep(5);
            cluster.killTopology("HelloTridentTopology");
            cluster.shutdown();
        }
        
    }
    
    private static class MyMapFunction implements MapFunction{
        private static final Logger LOG = LoggerFactory.getLogger(MyMapFunction.class);
        
        @Override
        public Values execute(TridentTuple input) {
            String line = input.getStringByField("line");
            LOG.warn("================>> myMapFunction process execute:value :{}",line);
            return new Values(line.toUpperCase());
        }
    }
}

4.ProjectionFunctionTrident

  需求:對一組tuple的數據,取部分數據

public class ProjectionFunctionTrident {
    
    private static final Logger LOG = LoggerFactory.getLogger(ProjectionFunctionTrident.class);
    
    public static void main(String [] args) throws InterruptedException{
        
        @SuppressWarnings("unchecked")
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("x","y","z"), 3, 
                new Values(1,2,3),
                new Values(4,5,6),
                new Values(7,8,9), 
                new Values(10,11,12)
        );
        spout.setCycle(false);
        
        Config conf = new Config();
        conf.setNumWorkers(3);
        conf.setDebug(false);
        
        TridentTopology topology = new TridentTopology();
        topology.newStream("ProjectionTrident", spout).parallelismHint(1)
                .localOrShuffle().peek(tridentTuple ->LOG.info("================ {}",tridentTuple)).parallelismHint(2)
                .shuffle()
                .project(new Fields("y","z")).parallelismHint(2)
                .localOrShuffle().peek(tridentTuple ->LOG.info(">>>>>>>>>>>>>>>> {}",tridentTuple)).parallelismHint(2);
                
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("ProjectionTrident", conf, topology.build());
        TimeUnit.SECONDS.sleep(30);
        cluster.killTopology("ProjectionTrident");
        cluster.shutdown();
    }
}

5.2 Broadcast

  需求:將一組batch 的tuple數據發送到所有partition上

public class BroadcastRepartitionTrident {
    
    private static final Logger LOG = LoggerFactory.getLogger(BroadcastRepartitionTrident.class);
    
    public static void main(String [] args) throws InterruptedException{
        
        @SuppressWarnings("unchecked")
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                new Values("java",1),
                new Values("scala",2),
                new Values("haddop",3), 
                new Values("java",4),
                new Values("haddop",5)
        );
        spout.setCycle(false);
        
        Config conf = new Config();
        conf.setNumWorkers(3);
        conf.setDebug(false);
        
        TridentTopology topology = new TridentTopology();
        topology.newStream("BroadcastRepartitionTrident", spout).parallelismHint(1)
                .broadcast().peek(tridentTuple ->LOG.info("================ {}",tridentTuple))
                .parallelismHint(2);    
                
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("BroadcastRepartitionTrident", conf, topology.build());
        TimeUnit.SECONDS.sleep(30);
        cluster.killTopology("BroadcastRepartitionTrident");
        cluster.shutdown();
    }
}

5.3 PartitionBy

  需求:將一組batch中的tuple通過設置的字段分到同一個task中執行

public class PartitionByRepartitionTrident {
    
    private static final Logger LOG = LoggerFactory.getLogger(PartitionByRepartitionTrident.class);
    
    public static void main(String [] args) throws InterruptedException{
        
        @SuppressWarnings("unchecked")
        //FixedBatchSpout()里面參數解釋:
        //    1.spout 的字段名稱的設置
        //    2.設置數據幾個為一個批次
        //    3.字段值的設置
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                new Values("java",23),
                new Values("scala",3),
                new Values("haddop",10), 
                new Values("java",23),
                new Values("haddop",10)
        );
        spout.setCycle(false);
        
        Config conf = new Config();
        conf.setNumWorkers(3);
        conf.setDebug(false);
        
        TridentTopology topology = new TridentTopology();
        topology.newStream("PartitionByRepartitionTrident", spout).parallelismHint(1)
                .partitionBy(new Fields("language")).peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple))
                .parallelismHint(3);
                
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("PartitionByRepartitionTrident", conf, topology.build());
        TimeUnit.SECONDS.sleep(30);
        cluster.killTopology("PartitionByRepartitionTrident");
        cluster.shutdown();
    }
}

5.4 Global

   需求:對一組batch中的tuple 進行全局分組統計

public class GlobalRepatitionTrident {
    
    private static final Logger LOG = LoggerFactory.getLogger(GlobalRepatitionTrident.class);
    
    public static void main(String [] args) throws InterruptedException{
        
        @SuppressWarnings("unchecked")
        //FixedBatchSpout()里面參數解釋:
        //    1.spout 的字段名稱的設置
        //    2.設置數據幾個為一個批次
        //    3.字段值的設置
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                new Values("java",23),
                new Values("scala",3),
                new Values("haddop",10), 
                new Values("java",23),
                new Values("haddop",10)
        );
        spout.setCycle(false);
        
        Config conf = new Config();
        conf.setNumWorkers(3);
        conf.setDebug(false);
        
        TridentTopology topology = new TridentTopology();
        topology.newStream("PartitionByRepartitionTrident", spout).parallelismHint(1)
                    .partitionBy(new Fields("language"))
                    .parallelismHint(3)    //不管配多少個並行度,都沒有影響
                    .peek(tridentTuple ->LOG.info(" ================= {}",tridentTuple))
                    .global()
                    .peek(tridentTuple ->LOG.info(" >>>>>>>>>>>>>>>>> {}",tridentTuple));
                
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("GlobalRepatitionTrident", conf, topology.build());
        TimeUnit.SECONDS.sleep(30);
        cluster.killTopology("GlobalRepatitionTrident");
        cluster.shutdown();
    }
}

  5.5 batchGlobal 

    需求:不同batch的tuple分到不同的task中

public class BatchGlobalRepatitionTrident2 {
    
    private static final Logger LOG = LoggerFactory.getLogger(BatchGlobalRepatitionTrident2.class);
    
    public static void main(String [] args) throws InterruptedException{
        
        @SuppressWarnings("unchecked")
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                new Values("java",1),
                new Values("scala",2),
                new Values("scala",3),
                new Values("haddop",4), 
                new Values("java",5),
                new Values("haddop",6)
        );
        spout.setCycle(false);
        
        Config conf = new Config();
        conf.setNumWorkers(3);
        conf.setDebug(false);
        
        TridentTopology topology = new TridentTopology();
        topology.newStream("BatchGlobalRepatitionTrident2", spout).parallelismHint(1)
                .batchGlobal().peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple))
                .parallelismHint(3);    
                
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("BatchGlobalRepatitionTrident2", conf, topology.build());
        TimeUnit.SECONDS.sleep(30);
        cluster.killTopology("BatchGlobalRepatitionTrident2");
        cluster.shutdown();
    }
}

  5.6 partition 

    需求:自定義partition 

public class CustomRepartitionTrident {
    
    private static final Logger LOG = LoggerFactory.getLogger(CustomRepartitionTrident.class);
    
    public static void main(String [] args) throws InterruptedException{
        
        @SuppressWarnings("unchecked")
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                new Values("java",1),
                new Values("scala",2),
                new Values("haddop",3), 
                new Values("java",4),
                new Values("haddop",5)
        );
        spout.setCycle(false);
        
        Config conf = new Config();
        conf.setNumWorkers(3);
        conf.setDebug(false);
        
        TridentTopology topology = new TridentTopology();
        topology.newStream("CustomRepartitionTrident", spout).parallelismHint(1)
                .partition(new HighTaskIDGrouping()).peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple))
                .parallelismHint(2);    
                
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("CustomRepartitionTrident", conf, topology.build());
        TimeUnit.SECONDS.sleep(30);
        cluster.killTopology("CustomRepartitionTrident");
        cluster.shutdown();
    }
}

/**
 * 自定義grouping : 
 *         讓task編號更大的執行任務
 * @author pengbo.zhao
 *
 */
public class HighTaskIDGrouping implements CustomStreamGrouping{
    
    private int taskID;
    
    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        //List<Integer> targetTasks: 下游所有的tasks的集合
        ArrayList<Integer> tasks = new ArrayList<>(targetTasks);
        Collections.sort(tasks);        //從小到大排列
        this.taskID = tasks.get(tasks.size() -1);
    }

    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
         
        return Arrays.asList(taskID);
    }
}

  6.1 partitionAggregate

    需求:對一組batch中tuple個數的統計

public class PartitionAggregateTrident {
    
    private static final Logger LOG = LoggerFactory.getLogger(PartitionAggregateTrident.class);
    
    private FixedBatchSpout spout;
    
    @SuppressWarnings("unchecked")
    @Before
    public void setSpout(){
        this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                new Values("java",1),
                new Values("scala",2),
                new Values("scala",3),
                new Values("haddop",4), 
                new Values("java",5),
                new Values("haddop",6)    
        );
        this.spout.setCycle(false);
    }
    
    @Test
    public void testPartitionAggregtor(){
        
        TridentTopology topoloty = new TridentTopology();
        topoloty.newStream("PartitionAggregateTrident", spout).parallelismHint(2)//內部的優先級參數是1,所以我們寫2是無效的
                .shuffle()
                .partitionAggregate(new Fields("name","age"), new Count(),new Fields("count"))
                .parallelismHint(2)
//                .each(new Fields("count"),new Debug());
                .peek(input ->LOG.info(" >>>>>>>>>>>>>>>>> {}",input.getLongByField("count")));
        
        this.submitTopology("PartitionAggregateTrident", topoloty.build());
    }
        
    public void submitTopology(String name,StormTopology topology) {
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(name, createConf(), topology);
        
        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        cluster.killTopology(name);
        cluster.shutdown();
    }
    
    public Config createConf(){
        Config conf = new Config();
        conf.setNumWorkers(3);
        conf.setDebug(false);
        return conf;
    }
}

  6.2 aggregator

    需求:對tuple中的數據進行統計 

public class AggregateTrident {
    
    private static final Logger LOG = LoggerFactory.getLogger(AggregateTrident.class);
    
    private FixedBatchSpout spout;
    
    @SuppressWarnings("unchecked")
    @Before
    public void setSpout(){
        this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                new Values("java",1),
                new Values("scala",2),
                new Values("scala",3),
                new Values("haddop",4), 
                new Values("java",5),
                new Values("haddop",6)    
        );
        this.spout.setCycle(false);
    }
    
    @Test
    public void testPartitionAggregtor(){
        
        TridentTopology topoloty = new TridentTopology();
        topoloty.newStream("AggregateTrident", spout).parallelismHint(2)
                .partitionBy(new Fields("name"))
                .aggregate(new Fields("name","age"), new Count(),new Fields("count"))
//                .aggregate(new Fields("name","age"), new CountAsAggregator(),new Fields("count"))
                .parallelismHint(2)
                .each(new Fields("count"),new Debug())
                .peek(input -> LOG.info("============> count:{}",input.getLongByField("count")));
        
        this.submitTopology("AggregateTrident", topoloty.build());
    }
        
    public void submitTopology(String name,StormTopology topology) {
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(name, createConf(), topology);
        
        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        cluster.killTopology(name);
        cluster.shutdown();
    }
    
    public Config createConf(){
        Config conf = new Config();
        conf.setNumWorkers(3);
        conf.setDebug(false);
        return conf;
    }
}

  6.3 reduceAggregator

     需求:對一批batch 中的tuple第0個元素求和。 即一批batch中的多少條tuple,對tuple中的指定字段求和

public class ReduceAggregatorTrident  {
    
    private FixedBatchSpout spout;
    
    @SuppressWarnings("unchecked")
    @Before
    public void setSpout(){
        this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                new Values("java",1),
                new Values("scala",2),
                new Values("scala",3),
                new Values("haddop",4), 
                new Values("java",5),
                new Values("haddop",6)    
        );
        this.spout.setCycle(false);
    }
    
    @Test
    public void testReduceAggregator(){
        
        TridentTopology topoloty = new TridentTopology();
        topoloty.newStream("ReduceAggregator", spout).parallelismHint(2)
                .partitionBy(new Fields("name"))
                .aggregate(new Fields("age","name"), new MyReduce(),new Fields("sum"))
                .parallelismHint(5)
                .each(new Fields("sum"),new Debug()); 
        
        this.submitTopology("ReduceAggregator", topoloty.build());
    }
        
    public void submitTopology(String name,StormTopology topology) {
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(name, createConf(), topology);
        
        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        cluster.killTopology(name);
        cluster.shutdown();
    }
    
    public Config createConf(){
        Config conf = new Config();
        conf.setNumWorkers(3);
        conf.setDebug(false);
        return conf;
    }
    
    static class MyReduce implements ReducerAggregator<Integer>{
        
        @Override
        public Integer init() {
            return 0;    //初始值為0
        }

        @Override
        public Integer reduce(Integer curr, TridentTuple tuple) {
            
            return curr + tuple.getInteger(0);
        }
    }
}

  6.4 combinerAggregate

    需求:對tuple中的字段進行求和操作

public class CombinerAggregate {
    
    private FixedBatchSpout spout;
    
    @SuppressWarnings("unchecked")
    @Before
    public void setSpout(){
        this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                new Values("java",1),
                new Values("scala",2),
                new Values("scala",3),
                new Values("haddop",4), 
                new Values("java",5),
                new Values("haddop",6)    
        );
        this.spout.setCycle(false);
    }
    
    @Test
    public void testCombinerAggregate(){
        
        TridentTopology topoloty = new TridentTopology();
        topoloty.newStream("CombinerAggregate", spout).parallelismHint(2)
                .partitionBy(new Fields("name"))
                .aggregate(new Fields("age"), new MyCount(),new Fields("count"))
                .parallelismHint(5)
                .each(new Fields("count"),new Debug());
        this.submitTopology("CombinerAggregate", topoloty.build());
    }
        
    public void submitTopology(String name,StormTopology topology) {
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(name, createConf(), topology);
        
        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        cluster.killTopology(name);
        cluster.shutdown();
    }
    
    public Config createConf(){
        Config conf = new Config();
        conf.setNumWorkers(3);
        conf.setDebug(false);
        return conf;
    }
    
    static class MyCount implements CombinerAggregator<Integer>{

        @Override
        public Integer init(TridentTuple tuple) {
            return tuple.getInteger(0);
        }

        @Override
        public Integer combine(Integer val1, Integer val2) {
            return val1 + val2;
        }

        @Override
        public Integer zero() {
            return 0;
        }
    }
}

  6.5  persistenceAggregator

     需求:對一批batch中tuple元素進行統計

public class PersistenceAggregator {
    
    private static final Logger LOG = LoggerFactory.getLogger(PersistenceAggregator.class);
    
    private FixedBatchSpout spout;
    
    @SuppressWarnings("unchecked")
    @Before
    public void setSpout(){
        this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                new Values("java",1),
                new Values("scala",2),
                new Values("scala",3),
                new Values("haddop",4), 
                new Values("java",5),
                new Values("haddop",6)    
        );
        this.spout.setCycle(false);
    }
    
    @Test
    public void testPersistenceAggregator(){
        
        TridentTopology topoloty = new TridentTopology();
        topoloty.newStream("testPersistenceAggregator", spout).parallelismHint(2)
                .partitionBy(new Fields("name"))
                .persistentAggregate(new MemoryMapState.Factory(), new Fields("name"), new Count(),new Fields("count"))
                .parallelismHint(4)
                .newValuesStream()
                .peek(input ->LOG.info("count:{}",input.getLongByField("count")));
        this.submitTopology("testPersistenceAggregator", topoloty.build());
    }
        
    public void submitTopology(String name,StormTopology topology) {
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(name, createConf(), topology);
        
        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        cluster.killTopology(name);
        cluster.shutdown();
    }
    
    public Config createConf(){
        Config conf = new Config();
        conf.setNumWorkers(3);
        conf.setDebug(false);
        return conf;
    }
    
}

  6.6  AggregateChina

    需求:對batch中的tuple進行統計、求和、統計操作

public class AggregateChina {
    private static final Logger LOG = LoggerFactory.getLogger(AggregateChina.class);
    
    private FixedBatchSpout spout;
    
    @SuppressWarnings("unchecked")
    @Before
    public void setSpout(){
        this.spout = new FixedBatchSpout(new Fields("name","age"),3, 
                new Values("java",1),
                new Values("scala",2),
                new Values("scala",3),
                new Values("haddop",4), 
                new Values("java",5),
                new Values("haddop",6)    
        );
        this.spout.setCycle(false);
    }
    
    @Test
    public void testAggregateChina(){
        
        TridentTopology topoloty = new TridentTopology();
        topoloty.newStream("AggregateChina", spout).parallelismHint(2)
                .partitionBy(new Fields("name"))
                .chainedAgg()
                .aggregate(new Fields("name"),new Count(), new Fields("count"))
                .aggregate(new Fields("age"),new Sum(), new Fields("sum"))
                .aggregate(new Fields("age"),new Count(), new Fields("count2"))
                .chainEnd()
                .peek(tuple->LOG.info("{}",tuple));
        this.submitTopology("AggregateChina", topoloty.build());
    }
        
    public void submitTopology(String name,StormTopology topology) {
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(name, createConf(), topology);
        
        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        cluster.killTopology(name);
        cluster.shutdown();
    }
    
    public Config createConf(){
        Config conf = new Config();
        conf.setNumWorkers(3);
        conf.setDebug(false);
        return conf;
    }
}

  7.GroupBy

    需求:對一批batch中的tuple按name來分組,求對分組后的tuple中的數據進行統計

public class GroupBy {
    
    private static final Logger LOG = LoggerFactory.getLogger(GroupBy.class);
    
    private FixedBatchSpout spout;
    
    @SuppressWarnings("unchecked")
    @Before
    public void setSpout(){
        this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                new Values("java",1),
                new Values("scala",2),
                new Values("scala",3),
                new Values("haddop",4), 
                new Values("java",5),
                new Values("haddop",6)    
        );
        this.spout.setCycle(false);
    }
    
    @Test
    public void testGroupBy(){
        
        TridentTopology topoloty = new TridentTopology();
        topoloty.newStream("GroupBy", spout).parallelismHint(1)
//                .partitionBy(new Fields("name"))
                .groupBy(new Fields("name"))
                .aggregate(new Count(), new Fields("count"))
                .peek(tuple -> LOG.info("{},{}",tuple.getFields(),tuple));
                
        this.submitTopology("GroupBy", topoloty.build());
    }
        
    public void submitTopology(String name,StormTopology topology) {
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(name, createConf(), topology);
        
        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        cluster.killTopology(name);
        cluster.shutdown();
    }
    
    public Config createConf(){
        Config conf = new Config();
        conf.setNumWorkers(3);
        conf.setDebug(false);
        return conf;
    }
}

 

 

 

 

 

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM