flink 1.11.2 學習筆記(5)-lambda表達式的使用問題


flink的api,提供了流暢的鏈式編程寫法,寫起來行雲流水,感受一下:

SingleOutputStreamOperator<Tuple3<String, Integer, String>> counts = env
        //設置並行度1,方便觀察輸出
        .setParallelism(1)
        //添加kafka數據源
        .addSource(
                new FlinkKafkaConsumer011<>(
                        SOURCE_TOPIC,
                        new SimpleStringSchema(),
                        props))
        //轉變成pojo對象
        .map((MapFunction<String, WordCountPojo>) value -> {
            WordCountPojo pojo = gson.fromJson(value, WordCountPojo.class);
            return pojo;
        })
        //設置watermark以及事件時間提取邏輯
        .assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<WordCountPojo>(Time.milliseconds(200)) {
                    @Override
                    public long extractTimestamp(WordCountPojo element) {
                        return element.eventTimestamp;
                    }
                })
        //統計每個word的出現次數
        .flatMap(new FlatMapFunction<WordCountPojo, Tuple3<String, Integer, String>>() {
            @Override
            public void flatMap(WordCountPojo value, Collector<Tuple3<String, Integer, String>> out) throws Exception {
                String word = value.word;
                //獲取每個統計窗口的時間(用於顯示)
                String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(value.eventTimestamp, 0, 60 * 1000)));
                if (word != null && word.trim().length() > 0) {
                    //收集(類似:map-reduce思路)
                    out.collect(new Tuple3<>(word.trim(), 1, windowTime));
                }
            }
        })
        .keyBy(v -> v.f0)
        //按1分鍾開窗(TumblingWindows)
        .timeWindow(Time.minutes(1))
        //允許數據延時10秒
        .allowedLateness(Time.seconds(10))
        //將word的count匯總
        .sum(1);

  

如果idea環境,使用jdk1.8的話,可能會智能提示,讓你把24行改與lambda表達式,看上去更清爽一些:

SingleOutputStreamOperator<Tuple3<String, Integer, String>> counts = env
    .setParallelism(1)
    .addSource(
            new FlinkKafkaConsumer011<>(
                    SOURCE_TOPIC,
                    new SimpleStringSchema(),
                    props))
    .map((MapFunction<String, WordCountPojo>) value -> {
        WordCountPojo pojo = gson.fromJson(value, WordCountPojo.class);
        return pojo;
    })
    .assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor<WordCountPojo>(Time.milliseconds(200)) {
                @Override
                public long extractTimestamp(WordCountPojo element) {
                    return element.eventTimestamp;
                }
            })
    .flatMap((FlatMapFunction<WordCountPojo, Tuple3<String, Integer, String>>) (value, out) -> {
        //改成lambda寫法
        String word = value.word;
        String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(value.eventTimestamp, 0, 60 * 1000)));
        if (word != null && word.trim().length() > 0) {
            out.collect(new Tuple3<>(word.trim(), 1, windowTime));
        }
    })
    .keyBy(v -> v.f0)
    .timeWindow(Time.minutes(1))
    .allowedLateness(Time.seconds(10))
    .sum(1);

邏輯完全沒變,但是運行后,會遇到一個報錯:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.

大致意思是,lambda寫法無法提供足夠的類型信息,無法推斷出正確的類型,建議要么改成匿名類寫法,要么用type information提供明細的類型信息。

 

解決方法:

SingleOutputStreamOperator<Tuple3<String, Integer, String>> counts = env
    .setParallelism(1)
    .addSource(
            new FlinkKafkaConsumer011<>(
                    SOURCE_TOPIC,
                    new SimpleStringSchema(),
                    props))
    .map((MapFunction<String, WordCountPojo>) value -> {
        WordCountPojo pojo = gson.fromJson(value, WordCountPojo.class);
        return pojo;
    })
    .assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor<WordCountPojo>(Time.milliseconds(200)) {
                @Override
                public long extractTimestamp(WordCountPojo element) {
                    return element.eventTimestamp;
                }
            })
    .flatMap((FlatMapFunction<WordCountPojo, Tuple3<String, Integer, String>>) (value, out) -> {
        String word = value.word;
        String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(value.eventTimestamp, 0, 60 * 1000)));
        if (word != null && word.trim().length() > 0) {
            out.collect(new Tuple3<>(word.trim(), 1, windowTime));
        }
    })
    //明細指定返回類型
    .returns(((TypeInformation) TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, String.class)))
    .keyBy(0)
    .timeWindow(Time.minutes(1))
    .allowedLateness(Time.seconds(10))
    .sum(1);

27行這里,明細指定返回類型,同時keyBy的寫法,略為調整下,就能正常運行了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM