一.trident 的介紹
trident 的英文意思是三叉戟,在這里我的理解是因為之前我們通過之前的學習topology spout bolt 去處理數據是沒有問題的,但trident 的對spout bolt 更高層次的一個抽象,其實現功能是一樣的,只不過是trident做了更多的優化和封裝.如果對處理的性能要求比較高,建議要采用spout bolt 來處理,反之則可以用trident
trident 你可以這樣理解,本身這個拓撲就是分散的,如果一個spout 可以有2個bolt,跟三叉戟比較像。(個人理解)
因為trident是對storm 更高一層的抽象,它與之前學的spout bolt處理數據流的方式不一樣,trident 是以batch(一組tuples)為單位進行處理的。
二.trident API操作
trident采用批處理的方式來處理數據,其API的操作是對數據處理的方式改成了函數。對數據處理的操作有:filter sum aggregator等
function函數的操作都是對流中的tuple進行操作的
下面介紹 trident 常用的API
1.each(Fields inputFields, Filter filter) 作用:操作batch中的每一個tuple內容,一般與Filter或者Function函數配合使用。 2.peek(Consumer action) 作用:不作任務操作,傳的參數是consumer,類似於System.out.println 3.partitionBy(Fields fields) 作用:將tuples中的數據按設置的字段重定向到下一處理邏輯,設置相同字段的tuple一定會被分配到同一個線程中處理。
三.trident 的常用函數
1.FilterFunction 過濾
作用:對一組batch 中的tuple數據過濾 實現過程是:自定義類實現BaseFilter接口,重寫isKeep()方法,在each()方法中使用自定義的類即可 2.SumFunction 求和 作用:對流中的數據進行加減 實現過程:自定義類實現BaseFunction接口,重寫execute方法,在each()方法中使用
3.MapFunction (一對一函數) 作用: 對一個tuple進行自定義操作 實現過程:自定義類實現MapFunction接口,重寫execute()方法,通過map()方法使用
4.ProjectionFunction (投影函數) 作用:投影函數,只保留stream中指定字段的數據。 實現過程:在project()方法中定義所需字段即可 例:有一個Stream包含如下字段: ["x","y","z"],使用投影: project(new Fields("y", "z")) 則輸出的流僅包含 ["y","z"]字段 5.repatition(重定向)
作用:重定向是指tuple通過下面哪種方式路由到下一層 shuffle: 通過隨機分配算法來均衡tuple到各個分區 broadcast: 每個tuple都被廣播到所有的分區,這種方式在drcp時非常有用,比如在每個分區上做stateQuery partitionBy:根據指定的字段列表進行划分,具體做法是用指定字段列表的hash值對分區個數做取模運算,確保相同字段列表的數據被划分到同一個分區 global: 所有的tuple都被發送到這個分區上,這個分區用來處理整個Stream的tuple數據的,但這個線程是獨立起的 batchGlobal:一個batch中的tuple都被發送到同一個分區,不同的batch會去往不同的分區 partition: 通過一個自定義的分區函數來進行分區,這個自定義函數實現了 backtype.storm.grouping.CustomStreamGrouping
6.Aggregation(聚合)
在storm的trident中處理數據是以批的形式進行處理的,所以在聚合時也是對批量內的數據進行的。經過aggregation的tuple,是被改變了原有的數據狀態 在Aggregator接口中有3個方法需要實現 init() : 當batch接收到數據時執行。並對tuple中的數據進行初始化 aggregate(): 在接收到batch中的每一個tuple時執行,該方法一個重定向方法,它會隨機啟動一個單獨的線程來進行聚合操作 complete() : 在一個batch的結束時執行
它是對當前partition上的各個batch執行聚合操作,它不是一個重定向操作,即統計batch上的tuple的操作
6.2 aggregator
對一批batch中的tuple數據進行聚合
6.3 ReduceAggregator
對一批batch中第n個元素的操作
對一批batch中的tuple進行聚合操作,它是一個重定向操作
持久化聚合器,在聚合之前先將數據存到一個位置,然后再對數據進行聚合操作
6.6 AggregateChina
聚合鏈,對一批batch 中的tuple進行多條件聚合操作
7.GroupBy
GroupBy會根據指定字段,把整個stream切分成一個個grouped stream,如果在grouped stream上做聚合操作,那么聚合就會發生在這些grouped stream上而不是整個batch。 如果groupBy后面跟的是aggregator,則是聚合操作,如果跟的是partitionAggregate,則不是聚合操作。
四.trident常用函數示例
1.FilterFunction
需求:在 一組數據中,過濾出第1個值與第2個值相加的值是偶數的
public class FilterTrident { private static final Logger LOG = LoggerFactory.getLogger(FilterTrident.class); @SuppressWarnings("unchecked") public static void main(String[] args) throws InterruptedException { FixedBatchSpout spout = new FixedBatchSpout(new Fields("a","b","c","d"), 3, new Values(1,4,7,10), new Values(1,1,3,11), new Values(2,2,7,1), new Values(2,5,7,2)); spout.setCycle(false); Config conf = new Config(); conf.setNumWorkers(4); conf.setDebug(false); TridentTopology topology = new TridentTopology(); // peek: 不做任務操作,因為參數的consumer // each:spout中的指定元素進行操作 topology.newStream("filter", spout).parallelismHint(1) .localOrShuffle() .peek(input -> LOG.info("peek1 ================{},{},{},{}",input.get(0),input.get(1),input.get(2),input.get(3))) .parallelismHint(2) .localOrShuffle() .each(new Fields("a","b"),new CheckEvenSumFilter()) .parallelismHint(2) .localOrShuffle() .peek(input -> LOG.info("peek2 +++++++++++++++++++{},{},{},{}", input.getIntegerByField("a"),input.getIntegerByField("b"), input.getIntegerByField("c"),input.getIntegerByField("d")) ).parallelismHint(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("FilterTrident", conf, topology.build()); LOG.warn("=================================================="); LOG.warn("the LocalCluster topology {} is submitted.","FilterTrident"); LOG.warn("=================================================="); TimeUnit.SECONDS.sleep(30); cluster.killTopology("FilterTrident"); cluster.shutdown(); } private static class CheckEvenSumFilter extends BaseFilter{ @Override public boolean isKeep(TridentTuple tuple) { Integer a = tuple.getIntegerByField("a"); Integer b = tuple.getIntegerByField("b"); return (a + b) % 2 == 0; } } }
2.SumFunction
需求:對一組數據中的前2個數求各
public class SumFunctionTrident { private static final Logger LOG = LoggerFactory.getLogger(SumFunctionTrident.class); @SuppressWarnings("unchecked") public static void main(String[] args) throws InterruptedException { FixedBatchSpout spout = new FixedBatchSpout(new Fields("a","b","c","d"), 3, new Values(1,4,7,10), new Values(1,1,3,11), new Values(2,2,7,1), new Values(2,5,7,2)); spout.setCycle(false); Config conf = new Config(); conf.setNumWorkers(4); conf.setDebug(false); TridentTopology topology = new TridentTopology(); // peek: 不做任務操作,因為參數的consumer // each:spout中的指定元素進行操作 topology.newStream("function", spout).parallelismHint(1) .localOrShuffle() .peek(input -> LOG.info("peek1 ================{},{},{},{}",input.get(0),input.get(1),input.get(2),input.get(3))) .parallelismHint(2) .localOrShuffle() .each(new Fields("a","b"),new SumFunction(),new Fields("sum")) .parallelismHint(2) .localOrShuffle() .peek(input -> LOG.info("peek2 ================{},{},{},{},{}", input.getIntegerByField("a"),input.getIntegerByField("b"),input.getIntegerByField("c"),input.getIntegerByField("d"),input.getIntegerByField("sum"))) .parallelismHint(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("SumFunctionTrident", conf, topology.build()); LOG.warn("=================================================="); LOG.warn("the LocalCluster topology {} is submitted.","SumFunctionTrident"); LOG.warn("=================================================="); TimeUnit.SECONDS.sleep(30); cluster.killTopology("HelloTridentTopology"); cluster.shutdown(); } private static class SumFunction extends BaseFunction{ @Override public void execute(TridentTuple tuple, TridentCollector collector) { Integer a = tuple.getIntegerByField("a"); Integer b = tuple.getIntegerByField("b"); collector.emit(new Values(a+b)); } } }
3.MapFunction
需求:對一組batch中的tuple進行大小寫轉換
public class MapFunctionTrident { private static final Logger LOG = LoggerFactory.getLogger(MapFunctionTrident.class); @SuppressWarnings("unchecked") public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException, AuthorizationException { boolean isRemoteMode = false; if(args.length > 0){ isRemoteMode = true; } FixedBatchSpout spout = new FixedBatchSpout(new Fields("line"),3, new Values("hello stream"), new Values("hello kafka"), new Values("hello hadoop"), new Values("hello scala"), new Values("hello java") ); spout.setCycle(true); TridentTopology topology = new TridentTopology(); Config conf = new Config(); conf.setNumWorkers(4); conf.setDebug(false); topology.newStream("hello", spout).parallelismHint(1) .localOrShuffle() .map(new MyMapFunction(),new Fields("upper")) .parallelismHint(2) .partition(Grouping.fields(ImmutableList.of("upper"))) .peek(input ->LOG.warn("================>> peek process value:{}",input.getStringByField("upper"))) .parallelismHint(3); if(isRemoteMode){ StormSubmitter.submitTopology("HelloTridentTopology", conf, topology.build()); LOG.warn("=================================================="); LOG.warn("the remote topology {} is submitted.","HelloTridentTopology"); LOG.warn("=================================================="); }else{ LocalCluster cluster = new LocalCluster(); cluster.submitTopology("HelloTridentTopology", conf, topology.build()); LOG.warn("=================================================="); LOG.warn("the LocalCluster topology {} is submitted.","HelloTridentTopology"); LOG.warn("=================================================="); TimeUnit.SECONDS.sleep(5); cluster.killTopology("HelloTridentTopology"); cluster.shutdown(); } } private static class MyMapFunction implements MapFunction{ private static final Logger LOG = LoggerFactory.getLogger(MyMapFunction.class); @Override public Values execute(TridentTuple input) { String line = input.getStringByField("line"); LOG.warn("================>> myMapFunction process execute:value :{}",line); return new Values(line.toUpperCase()); } } }
4.ProjectionFunctionTrident
需求:對一組tuple的數據,取部分數據
public class ProjectionFunctionTrident { private static final Logger LOG = LoggerFactory.getLogger(ProjectionFunctionTrident.class); public static void main(String [] args) throws InterruptedException{ @SuppressWarnings("unchecked") FixedBatchSpout spout = new FixedBatchSpout(new Fields("x","y","z"), 3, new Values(1,2,3), new Values(4,5,6), new Values(7,8,9), new Values(10,11,12) ); spout.setCycle(false); Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(false); TridentTopology topology = new TridentTopology(); topology.newStream("ProjectionTrident", spout).parallelismHint(1) .localOrShuffle().peek(tridentTuple ->LOG.info("================ {}",tridentTuple)).parallelismHint(2) .shuffle() .project(new Fields("y","z")).parallelismHint(2) .localOrShuffle().peek(tridentTuple ->LOG.info(">>>>>>>>>>>>>>>> {}",tridentTuple)).parallelismHint(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("ProjectionTrident", conf, topology.build()); TimeUnit.SECONDS.sleep(30); cluster.killTopology("ProjectionTrident"); cluster.shutdown(); } }
5.2 Broadcast
需求:將一組batch 的tuple數據發送到所有partition上
public class BroadcastRepartitionTrident { private static final Logger LOG = LoggerFactory.getLogger(BroadcastRepartitionTrident.class); public static void main(String [] args) throws InterruptedException{ @SuppressWarnings("unchecked") FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, new Values("java",1), new Values("scala",2), new Values("haddop",3), new Values("java",4), new Values("haddop",5) ); spout.setCycle(false); Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(false); TridentTopology topology = new TridentTopology(); topology.newStream("BroadcastRepartitionTrident", spout).parallelismHint(1) .broadcast().peek(tridentTuple ->LOG.info("================ {}",tridentTuple)) .parallelismHint(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("BroadcastRepartitionTrident", conf, topology.build()); TimeUnit.SECONDS.sleep(30); cluster.killTopology("BroadcastRepartitionTrident"); cluster.shutdown(); } }
5.3 PartitionBy
需求:將一組batch中的tuple通過設置的字段分到同一個task中執行
public class PartitionByRepartitionTrident { private static final Logger LOG = LoggerFactory.getLogger(PartitionByRepartitionTrident.class); public static void main(String [] args) throws InterruptedException{ @SuppressWarnings("unchecked") //FixedBatchSpout()里面參數解釋: // 1.spout 的字段名稱的設置 // 2.設置數據幾個為一個批次 // 3.字段值的設置 FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, new Values("java",23), new Values("scala",3), new Values("haddop",10), new Values("java",23), new Values("haddop",10) ); spout.setCycle(false); Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(false); TridentTopology topology = new TridentTopology(); topology.newStream("PartitionByRepartitionTrident", spout).parallelismHint(1) .partitionBy(new Fields("language")).peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple)) .parallelismHint(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("PartitionByRepartitionTrident", conf, topology.build()); TimeUnit.SECONDS.sleep(30); cluster.killTopology("PartitionByRepartitionTrident"); cluster.shutdown(); } }
5.4 Global
需求:對一組batch中的tuple 進行全局分組統計
public class GlobalRepatitionTrident { private static final Logger LOG = LoggerFactory.getLogger(GlobalRepatitionTrident.class); public static void main(String [] args) throws InterruptedException{ @SuppressWarnings("unchecked") //FixedBatchSpout()里面參數解釋: // 1.spout 的字段名稱的設置 // 2.設置數據幾個為一個批次 // 3.字段值的設置 FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, new Values("java",23), new Values("scala",3), new Values("haddop",10), new Values("java",23), new Values("haddop",10) ); spout.setCycle(false); Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(false); TridentTopology topology = new TridentTopology(); topology.newStream("PartitionByRepartitionTrident", spout).parallelismHint(1) .partitionBy(new Fields("language")) .parallelismHint(3) //不管配多少個並行度,都沒有影響 .peek(tridentTuple ->LOG.info(" ================= {}",tridentTuple)) .global() .peek(tridentTuple ->LOG.info(" >>>>>>>>>>>>>>>>> {}",tridentTuple)); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("GlobalRepatitionTrident", conf, topology.build()); TimeUnit.SECONDS.sleep(30); cluster.killTopology("GlobalRepatitionTrident"); cluster.shutdown(); } }
5.5 batchGlobal
需求:不同batch的tuple分到不同的task中
public class BatchGlobalRepatitionTrident2 { private static final Logger LOG = LoggerFactory.getLogger(BatchGlobalRepatitionTrident2.class); public static void main(String [] args) throws InterruptedException{ @SuppressWarnings("unchecked") FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, new Values("java",1), new Values("scala",2), new Values("scala",3), new Values("haddop",4), new Values("java",5), new Values("haddop",6) ); spout.setCycle(false); Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(false); TridentTopology topology = new TridentTopology(); topology.newStream("BatchGlobalRepatitionTrident2", spout).parallelismHint(1) .batchGlobal().peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple)) .parallelismHint(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("BatchGlobalRepatitionTrident2", conf, topology.build()); TimeUnit.SECONDS.sleep(30); cluster.killTopology("BatchGlobalRepatitionTrident2"); cluster.shutdown(); } }
5.6 partition
需求:自定義partition
public class CustomRepartitionTrident { private static final Logger LOG = LoggerFactory.getLogger(CustomRepartitionTrident.class); public static void main(String [] args) throws InterruptedException{ @SuppressWarnings("unchecked") FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, new Values("java",1), new Values("scala",2), new Values("haddop",3), new Values("java",4), new Values("haddop",5) ); spout.setCycle(false); Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(false); TridentTopology topology = new TridentTopology(); topology.newStream("CustomRepartitionTrident", spout).parallelismHint(1) .partition(new HighTaskIDGrouping()).peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple)) .parallelismHint(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("CustomRepartitionTrident", conf, topology.build()); TimeUnit.SECONDS.sleep(30); cluster.killTopology("CustomRepartitionTrident"); cluster.shutdown(); } } /** * 自定義grouping : * 讓task編號更大的執行任務 * @author pengbo.zhao * */ public class HighTaskIDGrouping implements CustomStreamGrouping{ private int taskID; @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { //List<Integer> targetTasks: 下游所有的tasks的集合 ArrayList<Integer> tasks = new ArrayList<>(targetTasks); Collections.sort(tasks); //從小到大排列 this.taskID = tasks.get(tasks.size() -1); } @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { return Arrays.asList(taskID); } }
6.1 partitionAggregate
需求:對一組batch中tuple個數的統計
public class PartitionAggregateTrident { private static final Logger LOG = LoggerFactory.getLogger(PartitionAggregateTrident.class); private FixedBatchSpout spout; @SuppressWarnings("unchecked") @Before public void setSpout(){ this.spout = new FixedBatchSpout(new Fields("name","age"), 3, new Values("java",1), new Values("scala",2), new Values("scala",3), new Values("haddop",4), new Values("java",5), new Values("haddop",6) ); this.spout.setCycle(false); } @Test public void testPartitionAggregtor(){ TridentTopology topoloty = new TridentTopology(); topoloty.newStream("PartitionAggregateTrident", spout).parallelismHint(2)//內部的優先級參數是1,所以我們寫2是無效的 .shuffle() .partitionAggregate(new Fields("name","age"), new Count(),new Fields("count")) .parallelismHint(2) // .each(new Fields("count"),new Debug()); .peek(input ->LOG.info(" >>>>>>>>>>>>>>>>> {}",input.getLongByField("count"))); this.submitTopology("PartitionAggregateTrident", topoloty.build()); } public void submitTopology(String name,StormTopology topology) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, createConf(), topology); try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } cluster.killTopology(name); cluster.shutdown(); } public Config createConf(){ Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(false); return conf; } }
6.2 aggregator
需求:對tuple中的數據進行統計
public class AggregateTrident { private static final Logger LOG = LoggerFactory.getLogger(AggregateTrident.class); private FixedBatchSpout spout; @SuppressWarnings("unchecked") @Before public void setSpout(){ this.spout = new FixedBatchSpout(new Fields("name","age"), 3, new Values("java",1), new Values("scala",2), new Values("scala",3), new Values("haddop",4), new Values("java",5), new Values("haddop",6) ); this.spout.setCycle(false); } @Test public void testPartitionAggregtor(){ TridentTopology topoloty = new TridentTopology(); topoloty.newStream("AggregateTrident", spout).parallelismHint(2) .partitionBy(new Fields("name")) .aggregate(new Fields("name","age"), new Count(),new Fields("count")) // .aggregate(new Fields("name","age"), new CountAsAggregator(),new Fields("count")) .parallelismHint(2) .each(new Fields("count"),new Debug()) .peek(input -> LOG.info("============> count:{}",input.getLongByField("count"))); this.submitTopology("AggregateTrident", topoloty.build()); } public void submitTopology(String name,StormTopology topology) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, createConf(), topology); try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } cluster.killTopology(name); cluster.shutdown(); } public Config createConf(){ Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(false); return conf; } }
6.3 reduceAggregator
需求:對一批batch 中的tuple第0個元素求和。 即一批batch中的多少條tuple,對tuple中的指定字段求和
public class ReduceAggregatorTrident { private FixedBatchSpout spout; @SuppressWarnings("unchecked") @Before public void setSpout(){ this.spout = new FixedBatchSpout(new Fields("name","age"), 3, new Values("java",1), new Values("scala",2), new Values("scala",3), new Values("haddop",4), new Values("java",5), new Values("haddop",6) ); this.spout.setCycle(false); } @Test public void testReduceAggregator(){ TridentTopology topoloty = new TridentTopology(); topoloty.newStream("ReduceAggregator", spout).parallelismHint(2) .partitionBy(new Fields("name")) .aggregate(new Fields("age","name"), new MyReduce(),new Fields("sum")) .parallelismHint(5) .each(new Fields("sum"),new Debug()); this.submitTopology("ReduceAggregator", topoloty.build()); } public void submitTopology(String name,StormTopology topology) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, createConf(), topology); try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } cluster.killTopology(name); cluster.shutdown(); } public Config createConf(){ Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(false); return conf; } static class MyReduce implements ReducerAggregator<Integer>{ @Override public Integer init() { return 0; //初始值為0 } @Override public Integer reduce(Integer curr, TridentTuple tuple) { return curr + tuple.getInteger(0); } } }
6.4 combinerAggregate
需求:對tuple中的字段進行求和操作
public class CombinerAggregate { private FixedBatchSpout spout; @SuppressWarnings("unchecked") @Before public void setSpout(){ this.spout = new FixedBatchSpout(new Fields("name","age"), 3, new Values("java",1), new Values("scala",2), new Values("scala",3), new Values("haddop",4), new Values("java",5), new Values("haddop",6) ); this.spout.setCycle(false); } @Test public void testCombinerAggregate(){ TridentTopology topoloty = new TridentTopology(); topoloty.newStream("CombinerAggregate", spout).parallelismHint(2) .partitionBy(new Fields("name")) .aggregate(new Fields("age"), new MyCount(),new Fields("count")) .parallelismHint(5) .each(new Fields("count"),new Debug()); this.submitTopology("CombinerAggregate", topoloty.build()); } public void submitTopology(String name,StormTopology topology) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, createConf(), topology); try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } cluster.killTopology(name); cluster.shutdown(); } public Config createConf(){ Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(false); return conf; } static class MyCount implements CombinerAggregator<Integer>{ @Override public Integer init(TridentTuple tuple) { return tuple.getInteger(0); } @Override public Integer combine(Integer val1, Integer val2) { return val1 + val2; } @Override public Integer zero() { return 0; } } }
6.5 persistenceAggregator
需求:對一批batch中tuple元素進行統計
public class PersistenceAggregator { private static final Logger LOG = LoggerFactory.getLogger(PersistenceAggregator.class); private FixedBatchSpout spout; @SuppressWarnings("unchecked") @Before public void setSpout(){ this.spout = new FixedBatchSpout(new Fields("name","age"), 3, new Values("java",1), new Values("scala",2), new Values("scala",3), new Values("haddop",4), new Values("java",5), new Values("haddop",6) ); this.spout.setCycle(false); } @Test public void testPersistenceAggregator(){ TridentTopology topoloty = new TridentTopology(); topoloty.newStream("testPersistenceAggregator", spout).parallelismHint(2) .partitionBy(new Fields("name")) .persistentAggregate(new MemoryMapState.Factory(), new Fields("name"), new Count(),new Fields("count")) .parallelismHint(4) .newValuesStream() .peek(input ->LOG.info("count:{}",input.getLongByField("count"))); this.submitTopology("testPersistenceAggregator", topoloty.build()); } public void submitTopology(String name,StormTopology topology) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, createConf(), topology); try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } cluster.killTopology(name); cluster.shutdown(); } public Config createConf(){ Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(false); return conf; } }
6.6 AggregateChina
需求:對batch中的tuple進行統計、求和、統計操作
public class AggregateChina { private static final Logger LOG = LoggerFactory.getLogger(AggregateChina.class); private FixedBatchSpout spout; @SuppressWarnings("unchecked") @Before public void setSpout(){ this.spout = new FixedBatchSpout(new Fields("name","age"),3, new Values("java",1), new Values("scala",2), new Values("scala",3), new Values("haddop",4), new Values("java",5), new Values("haddop",6) ); this.spout.setCycle(false); } @Test public void testAggregateChina(){ TridentTopology topoloty = new TridentTopology(); topoloty.newStream("AggregateChina", spout).parallelismHint(2) .partitionBy(new Fields("name")) .chainedAgg() .aggregate(new Fields("name"),new Count(), new Fields("count")) .aggregate(new Fields("age"),new Sum(), new Fields("sum")) .aggregate(new Fields("age"),new Count(), new Fields("count2")) .chainEnd() .peek(tuple->LOG.info("{}",tuple)); this.submitTopology("AggregateChina", topoloty.build()); } public void submitTopology(String name,StormTopology topology) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, createConf(), topology); try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } cluster.killTopology(name); cluster.shutdown(); } public Config createConf(){ Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(false); return conf; } }
7.GroupBy
需求:對一批batch中的tuple按name來分組,求對分組后的tuple中的數據進行統計
public class GroupBy { private static final Logger LOG = LoggerFactory.getLogger(GroupBy.class); private FixedBatchSpout spout; @SuppressWarnings("unchecked") @Before public void setSpout(){ this.spout = new FixedBatchSpout(new Fields("name","age"), 3, new Values("java",1), new Values("scala",2), new Values("scala",3), new Values("haddop",4), new Values("java",5), new Values("haddop",6) ); this.spout.setCycle(false); } @Test public void testGroupBy(){ TridentTopology topoloty = new TridentTopology(); topoloty.newStream("GroupBy", spout).parallelismHint(1) // .partitionBy(new Fields("name")) .groupBy(new Fields("name")) .aggregate(new Count(), new Fields("count")) .peek(tuple -> LOG.info("{},{}",tuple.getFields(),tuple)); this.submitTopology("GroupBy", topoloty.build()); } public void submitTopology(String name,StormTopology topology) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, createConf(), topology); try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } cluster.killTopology(name); cluster.shutdown(); } public Config createConf(){ Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(false); return conf; } }