Flink中API使用詳細范例--window


 正文前先來一波福利推薦:

福利一:

百萬年薪架構師視頻,該視頻可以學到很多東西,是本人花錢買的VIP課程,學習消化了一年,為了支持一下女朋友公眾號也方便大家學習,共享給大家。

福利二:

畢業答辯以及工作上各種答辯,平時積累了不少精品PPT,現在共享給大家,大大小小加起來有幾千套,總有適合你的一款,很多是網上是下載不到。

獲取方式:

微信關注 精品3分鍾 ,id為 jingpin3mins,關注后回復   百萬年薪架構師 ,精品收藏PPT  獲取雲盤鏈接,謝謝大家支持!

 

------------------------正文開始---------------------------

 

Flink Window機制范例實錄:

什么是Window?有哪些用途? 

1、window又可以分為基於時間(Time-based)的window

2、基於數量(Count-based)的window。

Flink DataStream API提供了Time和Count的window,同時增加了基於Session的window。

同時,由於某些特殊的需要,DataStream API也提供了定制化的window操作,供用戶自定義window。

下面,主要介紹Time-Based window以及Count-Based window,以及自定義的window操作,Session-Based Window操作將會在后續的文章中講到。

 

1、Time-Based Window 

細分:基於時間的window又分為:

增量聚合;全量聚合。

--------------------------------增量聚合-------------------------------:

類似於 Flink Sql中的 group window,計算結果不斷的更新;

------------------------------------------------------------------------------

代碼示例:

1.1、Tumbling window(翻滾) 
此處的window要在keyed Stream上應用window操作,當輸入1個參數時,代表Tumbling window操作,每分鍾統計一次,此處用scala語言實現:

增量聚合代碼---- 求和操作:

        //todo 獲得數據源后進行算子操作
        DataStream<StartAppCount> windowedData = startupInfoData.keyBy("appId")     //以設備id進行分組
                        .timeWindow(Time.minutes(60))                 //指定時間窗口大小為5分鍾,指定時間間隔為5分鍾
                        .aggregate(new CountAgg(), new WindowResultFunction());

        windowedData.print();

CountAgg自定義的函數,需要實現 AggregateFunction函數

public class CountAgg implements AggregateFunction<StartupInfoData, Long, Long> {

    @Override
    public Long createAccumulator() {                            //初始化算子
        return 0L;
    }

    @Override
    public Long add(StartupInfoData startupInfoData, Long acc) {    //傳入一個入參后,做累加操作,將算子加1
        return acc + 1;
    }

    @Override
    public Long getResult(Long acc) {                              //最輸出merge產生的結果
        return acc;
    }

    @Override
    public Long merge(Long acc1, Long acc2) {                     //對算子進行每一個的累和
        return acc1 + acc2;
    }
}

輸出函數格式:

public class WindowResultFunction implements WindowFunction<Long, StartAppCount, Tuple, TimeWindow>
{
    @Override
    public void apply(
            Tuple key,                              // 窗口的主鍵,即 appId
            TimeWindow window,                      // 窗口
            Iterable<Long> aggregateResult,         // 聚合函數的結果,即 count 值
            Collector<StartAppCount> collector      // 輸出類型為 StartAppCount
    ) throws Exception
    {
        String appId = ((Tuple1<String>) key).f0;
        Long count = aggregateResult.iterator().next();
        collector.collect(StartAppCount.of(appId, window.getEnd(), count));
    }

自定義輸出類的類格式:

public class StartAppCount {

    public String appId;     // 商品ID
    public long windowEnd;  // 窗口結束時間戳
    public long count;  // 商品的點擊量

    public static StartAppCount of (String appId, long windowEnd, long count) {
        StartAppCount result = new StartAppCount();
        result.appId = appId;
        result.windowEnd = windowEnd;
        result.count = count;
        return result;
    }

    @Override
    public String toString() {
        return "WordWithCount{" +
                "appId='" + appId + '\'' +
                ", count=" + count +
                '}';
    }

}

 

增量聚合代碼---- 求平均值操作:

public class AverageAggregate implements AggregateFunction<Tuple2<String,Long>, Tuple2<Long, Long>, Double> {
    @Override
    public Tuple2<Long, Long> createAccumulator() {
        return new Tuple2<>(0L, 0L);
    }

    @Override
    public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> acc) {   //可以理解為緩存的中間值
        return new Tuple2<>(acc.f0 + value.f1, acc.f1 + 1L);   //傳入的值加到acc的第一個值得到傳入值, 第二個值為個數
    }

    @Override
    public Double getResult(Tuple2<Long, Long> acc) {
        return (double)acc.f0 / acc.f1;
    }

    @Override
    public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {    //進行累和合並
        return new Tuple2<>(acc1.f0+acc2.f0, acc1.f1+acc2.f1);
    }
}

 

使用sum進行求和的代碼:

        DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒
                .sum("count");//在這里使用sum或者reduce都可以
                /*.reduce(new ReduceFunction<WordWithCount>() {
                                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {

                                        return new WordWithCount(a.word,a.count+b.count);
                                    }
                                })*/
        //把數據打印到控制台並且設置並行度
        windowCounts.print().setParallelism(1);

 

使用reduce進行求和的方法:

        DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒
                //.sum("count");//在這里使用sum或者reduce都可以
                .reduce(new ReduceFunction<WordWithCount>() {
                                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {

                                        return new WordWithCount(a.word,a.count+b.count);
                                    }
                                });

 

 

--------------------------------全量的時間窗口操作-------------------------------:

 代碼示例:

public class MyprocessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

    @Override
    public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> out) throws Exception {
        long count = 0;
        for(Tuple2<String,Long> in : iterable)
        {
            count++;
        }

        out.collect("Window: " + context.window() + "count: " + count);
    }
}

 

1.2、Sliding window(滑動) 

        //todo 獲得數據源后進行算子操作
        DataStream<StartAppCount> windowedData = startupInfoData.keyBy("appId")     //以設備id進行分組
                        .timeWindow(Time.minutes(60), Time.seconds(5))                   //指定時間窗口大小為5分鍾,指定時間間隔為5分鍾
                        .aggregate(new CountAgg(), new WindowResultFunction());

        windowedData.print();

 

2、Count-Based Window 


2.1、Tumbling Window (滾動計數窗口)

和Time-Based一樣,Count-based window同樣支持翻滾與滑動窗口,即在Keyed Stream上,統計每100個元素的數量之和

public class FlinkCountWindowDemo {

 public static void main(String[] args) throws Exception
{ final ParameterTool
params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(1); final int windowSize = params.getInt("window", 100); // read source data DataStreamSource<Tuple2<String, String>> inStream = env.addSource(new StreamDataSource()); // calculate DataStream<Tuple2<String, String>> outStream = inStream .keyBy(0) .countWindow(windowSize) .reduce( new ReduceFunction<Tuple2<String, String>>() { @Override public Tuple2<String, String> reduce(Tuple2<String, String> value1, Tuple2<String, String> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + "" + value2.f1); } } ); outStream.print(); env.execute("WindowWordCount"); } }

 

2.2、Sliding Window 

盜用 Flink 原理與實現:Window 機制 中的一張圖,假設有一個滑動計數窗口,每2個元素計算一次最近4個元素的總和,那么窗口工作示意圖如下所示:

 

 代碼示例:

public class FlinkCountWindowDemo {

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(1);
        final int windowSize = params.getInt("window", 3);
        final int slideSize = params.getInt("slide", 2);

        // read source data
        DataStreamSource<Tuple2<String, String>> inStream = env.addSource(new StreamDataSource());

        // calculate
        DataStream<Tuple2<String, String>> outStream = inStream
                                                           .keyBy(0)
                                                           .countWindow(windowSize, slideSize)
                                                           .reduce(
                                                               new ReduceFunction<Tuple2<String, String>>() {
                                                                   @Override
                                                                   public Tuple2<String, String> reduce(Tuple2<String, String> value1, Tuple2<String, String> value2) throws Exception {
                                                                       return Tuple2.of(value1.f0, value1.f1 + "" + value2.f1);
                                                                   }
                                                               }
                                                           );
        outStream.print();
        env.execute("WindowWordCount");
    }
}

 

3、Advanced Window(自定義window) 

自定義的Window需要指定3個function。 
3.1、Window Assigner:負責將元素分配到不同的window。

 

WindowAPI提供了自定義的WindowAssigner接口,我們可以實現WindowAssigner的public abstract Collection<W> assignWindows(T element, long timestamp)方法。同時,對於基於Count的window而言,默認采用了GlobalWindow的window assigner,例如:keyValue.window(GlobalWindows.create())


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM