flink的keyby算子作用是把相同key的數據發送到一個分區(即一個subtask里面去),采用的是哈希分區方法。
用法多樣,主要整理了下圖中的四種用法。
第一種是 key(Interger) 用法,傳入一個整數,這個整數對應的是元組中的元素順序是第幾個,(注:可以是多個key,不一定只有一個,下圖為了方便只寫了一種)(只適合上一級傳過來的數據是元組類型的)
第二種是 Key(String)用法,這個String 只能是f0,f1,,,,,原因是上一級的是元組類型,而元組本身已經為變量寫了變量名了,可以看下圖。
前倆種都已經過時了,第三種是new Keyseletor方法來構造一個匿名內部類來返回key
最后一種是 在flink自帶的類型不好用的時候,自己封裝一個bean,在上一級的傳遞值中new,然后按照之前元組同樣的方法來選定Key進行分區
public static void main(String[] args) {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = executionEnvironment.socketTextStream("Hadoop02", 8088);
SingleOutputStreamOperator<Tuple2<String, Integer>> map1 = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return Tuple2.of(s, 1);
}
});
SingleOutputStreamOperator<province> map2 = source.map(new MapFunction<String, province>() {
@Override
public province map(String s) throws Exception {
return new province(s);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = map1.keyBy(0);
KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream1 = map1.keyBy("f0");
KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = map1.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
});
KeyedStream<province, Tuple> mapname = map2.keyBy("name");
}
public static class province {
String name;
public province(){}
public province(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "province{" +
"name='" + name + '\'' +
'}';
}
}