Apache Flink 零基础入门(七)Flink中keyBy三种方式指定key


来源于  https://blog.csdn.net/vincent_duan/article/details/100880838

keyBy 如何指定key
不管是stream还是batch处理,都有一个keyBy(stream)和groupBy(batch)操作。那么该如何指定key?

Some transformations (join, coGroup, keyBy, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate, Windows) allow data being grouped on a key before they are applied.

 一些算子(transformations)例如join,coGroup,keyBy,groupBy往往需要定义一个key。其他的算子例如Reduce, GroupReduce, Aggregate, Windows,也允许数据按照key进行分组。

DataSet

DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
DataStream

DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);
类似于mysql中的join操作:select a.* , b.* from a join b on a.id=b.id

这里的keyBy就是a.id=b.id

有哪几种方式定义Key?
方式一:Tuple
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
可以传字段的位置

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
可以传字段位置的组合

这对于简单的使用时没问题的。但是对于内嵌的Tuple,如下所示:

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
如果使用keyBy(0),那么他就会使用整个Tuple2<Integer, Float>作为key,(因为Tuple2<Integer, Float>是Tuple3<Tuple2<Integer, Float>,String,Long>的0号位置)。如果想要指定key到Tuple2<Integer, Float>内部中,可以使用下面的方式。

方式二:字段表达式
我们可以使用基于字符串字段表达式来引用内嵌字段去定义key。

之前我们的算子写法是这样的:

text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split(",");
for(String token: tokens) {
if(token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
其中的new FlatMapFunction<String, Tuple2<String, Integer>>表示输入是一个String,输出是一个Tuple2<String, Integer>。这里我们重新定义一个内部类:

public static class WC {
private String word;
private int count;

public WC() {
}

public WC(String word, int count) {
this.word = word;
this.count = count;
}

@Override
public String toString() {
return "WC{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}

public String getWord() {
return word;
}

public void setWord(String word) {
this.word = word;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}
}
修改算子的写法:

text.flatMap(new FlatMapFunction<String, WC>() {
@Override
public void flatMap(String value, Collector<WC> out) throws Exception {
String[] tokens = value.toLowerCase().split(",");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new WC(token, 1));
}
}
}
}).keyBy("word").timeWindow(Time.seconds(5)).sum("count").print().setParallelism(1);
将原来的输出Tuple2<String, Integer>,修改为输出WC类型;将原来的keyBy(0)修改为keyBy("word");将原来的sum(1)修改为sum("count")

因此,在这个例子中我们有一个POJO类,有两个字段分别是"word"和"count",可以传递字段名到keyBy("")中。

语法:

字段名一定要与POJO类中的字段名一致。一定要提供默认的构造函数,和get与set方法。
使用Tuple时,0表示第一个字段
可以使用嵌套方式,举例如下:
public static class WC {
public ComplexNestedClass complex; //nested POJO
private int count;
// getter / setter for private field (count)
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
"count",指向的是WC中的字段count
"complex",指向的是复杂数据类型,会递归选择所有ComplexNestedClass的字段
"complex.word.f2",指向的是Tuple3中的最后一个字段。
"complex.hadoopCitizen",指向的是Hadoop IntWritable type
scala写法:

object StreamingWCScalaApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 引入隐式转换
import org.apache.flink.api.scala._

val text = env.socketTextStream("192.168.152.45", 9999)
text.flatMap(_.split(","))
.map(x => WC(x,1))
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
.print()
.setParallelism(1)

env.execute("StreamingWCScalaApp");
}
case class WC(word: String, count: Int)
}
 方式三:key选择器函数
.keyBy(new KeySelector<WC, Object>() {
@Override
public Object getKey(WC value) throws Exception {
return value.word;
}
})

————————————————
版权声明:本文为CSDN博主「vincent_duan」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/vincent_duan/java/article/details/100880838


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM