flink的keyby算子作用是把相同key的数据发送到一个分区(即一个subtask里面去),采用的是哈希分区方法。
用法多样,主要整理了下图中的四种用法。
第一种是 key(Interger) 用法,传入一个整数,这个整数对应的是元组中的元素顺序是第几个,(注:可以是多个key,不一定只有一个,下图为了方便只写了一种)(只适合上一级传过来的数据是元组类型的)
第二种是 Key(String)用法,这个String 只能是f0,f1,,,,,原因是上一级的是元组类型,而元组本身已经为变量写了变量名了,可以看下图。
前俩种都已经过时了,第三种是new Keyseletor方法来构造一个匿名内部类来返回key
最后一种是 在flink自带的类型不好用的时候,自己封装一个bean,在上一级的传递值中new,然后按照之前元组同样的方法来选定Key进行分区
public static void main(String[] args) {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = executionEnvironment.socketTextStream("Hadoop02", 8088);
SingleOutputStreamOperator<Tuple2<String, Integer>> map1 = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return Tuple2.of(s, 1);
}
});
SingleOutputStreamOperator<province> map2 = source.map(new MapFunction<String, province>() {
@Override
public province map(String s) throws Exception {
return new province(s);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = map1.keyBy(0);
KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream1 = map1.keyBy("f0");
KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = map1.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
});
KeyedStream<province, Tuple> mapname = map2.keyBy("name");
}
public static class province {
String name;
public province(){}
public province(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "province{" +
"name='" + name + '\'' +
'}';
}
}