「Flink」使用Managed Keyed State實現計數窗口功能


先上代碼:

public class WordCountKeyedState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 初始化測試單詞數據流
        DataStreamSource<String> lineDS = env.addSource(new RichSourceFunction<String>() {
            private boolean isCanaled = false;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while(!isCanaled) {
                    ctx.collect("hadoop flink spark");
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanaled = true;
            }
        });

        // 切割單詞,並轉換為元組
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordTupleDS = lineDS.flatMap((String line, Collector<Tuple2<String, Integer>> ctx) -> {
            Arrays.stream(line.split(" ")).forEach(word -> ctx.collect(Tuple2.of(word, 1)));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        // 按照單詞進行分組
        KeyedStream<Tuple2<String, Integer>, Integer> keyedWordTupleDS = wordTupleDS.keyBy(t -> t.f0);

        // 對單詞進行計數
        keyedWordTupleDS.flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            private transient ValueState<Tuple2<Integer, Integer>> countSumValueState;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 初始化ValueState
                ValueStateDescriptor<Tuple2<Integer, Integer>> countSumValueStateDesc = new ValueStateDescriptor("countSumValueState",
                        TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})
                );
                countSumValueState = getRuntimeContext().getState(countSumValueStateDesc);
            }

            @Override
            public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
                if(countSumValueState.value() == null) {
                    countSumValueState.update(Tuple2.of(0, 0));
                }

                Integer count = countSumValueState.value().f0;
                count++;
                Integer valueSum = countSumValueState.value().f1;
                valueSum += value.f1;

                countSumValueState.update(Tuple2.of(count, valueSum));

                // 每當達到3次,發送到下游
                if(count > 3) {
                    out.collect(Tuple2.of(value.f0, valueSum));
                    // 清除計數
                    countSumValueState.update(Tuple2.of(0, valueSum));
                }
            }
        }).print();

        env.execute("KeyedState State");
    }
}

代碼說明:

1、構建測試數據源,每秒鍾發送一次文本,為了測試方便,這里就發一個包含三個單詞的文本行

image

2、對句子按照空格切分,並將單詞轉換為元組,每個單詞初始出現的次數為1

image

3、按照單詞進行分組

 

 

4、自定義FlatMap

初始化ValueState,注意:ValueState只能在KeyedStream中使用,而且每一個ValueState都對一個一個key。每當一個並發處理ValueState,都會從上下文獲取到Key的取值,所以每個處理邏輯拿到的ValueStated都是對應指定key的ValueState,這個部分是由Flink自動完成的。

image

注意:

帶默認初始值的ValueStateDescriptor已經過期了,官方推薦讓我們手動在處理時檢查是否為空

instead and manually manage the default value by checking whether the contents of the state is null.

/**
* Creates a new {@code ValueStateDescriptor} with the given name, default value, and the specific
* serializer.
*
* @deprecated Use {@link #ValueStateDescriptor(String, TypeSerializer)} instead and manually
* manage the default value by checking whether the contents of the state is {@code null}.
*
* @param name The (unique) name for the state.
* @param typeSerializer The type serializer of the values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
* a value before.
*/
@Deprecated
public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer, T defaultValue) {
super(name, typeSerializer, defaultValue);
}

5、邏輯實現

在flatMap邏輯中判斷ValueState是否已經初始化,如果沒有手動給一個初始值。並進行累加后更新。每當count > 3發送計算結果到下游,並清空計數。

image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM