Flink狀態之AggregateState


1、主類

package com.example.demo.flink;

import com.example.demo.flink.impl.CountAverageWithAggregateState;
import com.example.demo.flink.impl.CountAverageWithReduceState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**
 * @program: demo
 * @description: valuestate
 * @author: yang
 * @create: 2020-12-28 15:46
 */
public class TestKeyedAggregateStateMain {
    public static void main(String[] args) throws  Exception{
        //獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //StreamExecutionEnvironment.getExecutionEnvironment();
        //設置並行度
        env.setParallelism(16);
        //獲取數據源
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(
                        Tuple2.of(1L, 3L),
                        Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L),
                        Tuple2.of(1L, 5L),
                        Tuple2.of(2L, 2L),
                        Tuple2.of(2L, 6L));


        // 輸出:
        //(1,5.0)
        //(2,4.0)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountAverageWithAggregateState())
                .print();


        env.execute("TestStatefulApi");
    }

}

2、處理實現類

package com.example.demo.flink.impl;

/**
 * @program: demo
 * @description: valuestate
 * @author: yang
 * @create: 2020-12-28 16:26
 */

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

/**
 *  ValueState<T> :這個狀態為每一個 key 保存一個值
 *      value() 獲取狀態值
 *      update() 更新狀態值
 *      clear() 清除狀態
 *
 *      IN,輸入的數據類型
 *      OUT:數據出的數據類型
 */
public class CountAverageWithAggregateState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, String>> {

    private AggregatingState<Long,String> aggregatingState;

    /**初始化*/
    @Override
    public void open(Configuration parameters) throws Exception {
        AggregatingStateDescriptor descriptor = new AggregatingStateDescriptor<Long,String,String>("AggregatingDescriptor", new AggregateFunction<Long,String,String>() {
            //變量初始化
            @Override
            public String createAccumulator() {
                return "Contains";
            }

            //數據處理
            @Override
            public String add(Long value, String accumulator) {
                return "Contains".equals(accumulator) ? accumulator + value : accumulator + "and" + value;
            }
            //返回值函數
            @Override
            public String getResult(String accumulator) {
                return accumulator;
            }
            //好像無用.......debug並沒有使用到該函數
            @Override
            public String merge(String o, String acc1) {
                return o + "and1111" + acc1;
            }
        },String.class);

        aggregatingState = getRuntimeContext().getAggregatingState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> ele, Collector<Tuple2<Long, String>> collector) throws Exception {
        aggregatingState.add(ele.f1);
        collector.collect(Tuple2.of(ele.f0,aggregatingState.get()));
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM