1、主類
package com.example.demo.flink; import com.example.demo.flink.impl.CountAverageWithAggregateState; import com.example.demo.flink.impl.CountAverageWithReduceState; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @program: demo * @description: valuestate * @author: yang * @create: 2020-12-28 15:46 */ public class TestKeyedAggregateStateMain { public static void main(String[] args) throws Exception{ //獲取執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); //StreamExecutionEnvironment.getExecutionEnvironment(); //設置並行度 env.setParallelism(16); //獲取數據源 DataStreamSource<Tuple2<Long, Long>> dataStreamSource = env.fromElements( Tuple2.of(1L, 3L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 5L), Tuple2.of(2L, 2L), Tuple2.of(2L, 6L)); // 輸出: //(1,5.0) //(2,4.0) dataStreamSource .keyBy(0) .flatMap(new CountAverageWithAggregateState()) .print(); env.execute("TestStatefulApi"); } }
2、處理實現類
package com.example.demo.flink.impl; /** * @program: demo * @description: valuestate * @author: yang * @create: 2020-12-28 16:26 */ import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; /** * ValueState<T> :這個狀態為每一個 key 保存一個值 * value() 獲取狀態值 * update() 更新狀態值 * clear() 清除狀態 * * IN,輸入的數據類型 * OUT:數據出的數據類型 */ public class CountAverageWithAggregateState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, String>> { private AggregatingState<Long,String> aggregatingState; /**初始化*/ @Override public void open(Configuration parameters) throws Exception { AggregatingStateDescriptor descriptor = new AggregatingStateDescriptor<Long,String,String>("AggregatingDescriptor", new AggregateFunction<Long,String,String>() { //變量初始化 @Override public String createAccumulator() { return "Contains"; } //數據處理 @Override public String add(Long value, String accumulator) { return "Contains".equals(accumulator) ? accumulator + value : accumulator + "and" + value; } //返回值函數 @Override public String getResult(String accumulator) { return accumulator; } //好像無用.......debug並沒有使用到該函數 @Override public String merge(String o, String acc1) { return o + "and1111" + acc1; } },String.class); aggregatingState = getRuntimeContext().getAggregatingState(descriptor); } @Override public void flatMap(Tuple2<Long, Long> ele, Collector<Tuple2<Long, String>> collector) throws Exception { aggregatingState.add(ele.f1); collector.collect(Tuple2.of(ele.f0,aggregatingState.get())); } }