flink學習筆記-split & select(拆分流)


說明:本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:

 Flink大數據項目實戰:http://t.cn/EJtKhaz

 

 

split

 

1.DataStream SplitStream

 

2.按照指定標准將指定的DataStream拆分成多個流用SplitStream來表示

 

 

 

select

 

1.SplitStream DataStream

 

2.split搭配使用,從SplitStream中選擇一個或多個流

 

 

 

案例:

 

public class TestSplitAndSelect {

 

    public static void main(String[] args) throws Exception {

 

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

 

 

 

        DataStream<Long> input=env.generateSequence(0,10);

 

 

 

        SplitStream<Long> splitStream = input.split(new OutputSelector<Long>() {

 

 

 

            @Override

 

            public Iterable<String> select(Long value) {

 List<String> output = new ArrayList<String>();

                if (value % 2 == 0) {

                    output.add("even");

                }

                else {

                    output.add("odd");

                }

                return output;

            }

 

        });

 

        //splitStream.print();

 

        DataStream<Long> even = splitStream.select("even");

        DataStream<Long> odd = splitStream.select("odd");

        DataStream<Long> all = splitStream.select("even","odd");

 

        //even.print();

 

        odd.print();

 

        //all.print();

 

        env.execute();

    }

}

1.12 project

含義:從Tuple中選擇屬性的子集

 

限制:

1.僅限event數據類型為TupleDataStream

2.僅限Java API

 

使用場景:

ETL時刪減計算過程中不需要的字段

案例:

public class TestProject {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

 DataStreamSource<Tuple4<String,String,String,Integer>> input=env.fromElements(TRANSCRIPT);

 

        DataStream<Tuple2<String, Integer>> out = input.project(1,3);

 

        out.print();

 

        env.execute();

 

    }

 

    public static final Tuple4[] TRANSCRIPT = new Tuple4[] {

            Tuple4.of("class1","張三","語文",100),

            Tuple4.of("class1","李四","語文",78),

            Tuple4.of("class1","王五","語文",99),

            Tuple4.of("class2","趙六","語文",81),

            Tuple4.of("class2","錢七","語文",59),

            Tuple4.of("class2","馬二","語文",97)

    };

}

1.13 assignTimestampsAndWatermarks

含義:提取記錄中的時間戳作為Event time,主要在window操作中發揮作用,不設置默認就是ProcessingTime

 

限制:

只有基於event time構建window時才起作用

 

使用場景:

當你需要使用event time來創建window時,用來指定如何獲取event的時間戳

 

案例:講到window時再說

1.14 window相關Operators

放在講解完Event Time之后在細講

 

構建window

 

1.window

 

2.windowAll

 

 

 

window上的操作

 

1.Window ApplyWindow Reduce

 

2.Window Fold

 

3.Aggregations on windows(summinmaxminBymaxBy)

 

4.Window Join

 

5.Window CoGroup

2. 物理分區

2.1回顧 Streaming DataFlow

 

2.2並行化DataFlow

2.3算子間數據傳遞模式

One-to-one streams

保持元素的分區和順序

 

Redistributing streams

1.改變流的分區

2.重新分區策略取決於使用的算子

akeyBy() (re-partitions by hashing the key) 

bbroadcast()

crebalance() (which re-partitions randomly)

 

2.4物理分區

 

能夠對分區在物理上進行改變的算子如下圖所示:

2.5 rescale

通過輪詢調度將元素從上游的task一個子集發送到下游task的一個子集。

原理:

第一個task並行度為2,第二個task並行度為6,第三個task並行度為2。從第一個task到第二個taskSrc的子集Src1 Map的子集Map123對應起來,Src1會以輪詢調度的方式分別向Map123發送記錄。從第二個task到第三個taskMap的子集123對應Sink的子集1,這三個流的元素只會發送到Sink1。假設我們每個TaskManager有三個Slot,並且我們開了SlotSharingGroup,那么通過rescale,所有的數據傳輸都在一個TaskManager內,不需要通過網絡。

2.6任務鏈和資源組相關操作

startNewChain()表示從這個操作開始,新啟一個新的chain

someStream.filter(...).map(...).startNewChain().map(...)

如上一段操作,表示從map()方法開始,新啟一個新的chain

 

如果禁用任務鏈可以調用disableChaining()方法。

 

如果想單獨設置一個SharingGroup,可以調用slotSharingGroup("name")方法。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM