Keyby算子(flink)


flink的keyby算子作用是把相同key的數據發送到一個分區(即一個subtask里面去),采用的是哈希分區方法。

用法多樣,主要整理了下圖中的四種用法。

第一種是 key(Interger) 用法,傳入一個整數,這個整數對應的是元組中的元素順序是第幾個,(注:可以是多個key,不一定只有一個,下圖為了方便只寫了一種)(只適合上一級傳過來的數據是元組類型的)

第二種是 Key(String)用法,這個String 只能是f0,f1,,,,,原因是上一級的是元組類型,而元組本身已經為變量寫了變量名了,可以看下圖。

前倆種都已經過時了,第三種是new Keyseletor方法來構造一個匿名內部類來返回key

最后一種是 在flink自帶的類型不好用的時候,自己封裝一個bean,在上一級的傳遞值中new,然后按照之前元組同樣的方法來選定Key進行分區

  public static void main(String[] args) {

        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = executionEnvironment.socketTextStream("Hadoop02", 8088);


        SingleOutputStreamOperator<Tuple2<String, Integer>> map1 = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return Tuple2.of(s, 1);
            }
        });

        SingleOutputStreamOperator<province> map2 = source.map(new MapFunction<String, province>() {
            @Override
            public province map(String s) throws Exception {
                return new province(s);
            }
        });

        KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = map1.keyBy(0);
        KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream1 = map1.keyBy("f0");
        KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = map1.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {

            @Override
            public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        });

        KeyedStream<province, Tuple> mapname = map2.keyBy("name");

    }
public static class province {
    String name;

    public province(){}
    public province(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "province{" +
                "name='" + name + '\'' +
                '}';
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM