來源於 https://www.jianshu.com/p/7bbf14996d39
看完了Flink的datasource、sink,也就把一頭一尾給看完了,從數據流入到數據流出,缺少了中間的處理環節。
而flink的大頭恰恰是只在這個中間環節,如下圖:

中間的處理環節比較復雜,現在也就看了其中一部分,這里先開始講其中最簡單 也最常用的map、flatmap及filter。
map
flink中dataSourceStream和java8中的map很類似,都是用來做轉換處理的,看下map的實現:
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) { TypeInformation<R> outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true); return this.transform("Map", outType, new StreamMap((MapFunction)this.clean(mapper))); }
可以看到:
1、返回的是SingleOutputStreamOperator泛型,這是個基礎的類型,好多DataStream的方法都返回它,比如map、flapmap、filter、process等
2、最終是調用transform方法來實現的,看下transfrom的實現:
@PublicEvolving public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { this.transformation.getOutputType(); OneInputTransformation<T, R> resultTransform = new OneInputTransformation(this.transformation, operatorName, operator, outTypeInfo, this.environment.getParallelism()); SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(this.environment, resultTransform); this.getExecutionEnvironment().addOperator(resultTransform); return returnStream; }
額,好像還不如不看,直接看怎么用吧!
@Slf4j public class KafkaUrlSinkJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("zookeeper.connect", "localhost:2181"); properties.put("group.id", "metric-group"); properties.put("auto.offset.reset", "latest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); SingleOutputStreamOperator<UrlInfo> dataStreamSource = env.addSource( new FlinkKafkaConsumer010<String>( "testjin",// topic new SimpleStringSchema(), properties ) ).setParallelism(1) // map操作,轉換,從一個數據流轉換成另一個數據流,這里是從string-->UrlInfo .map(string -> JSON.parseObject(string, UrlInfo.class)) }
可以看到,kafka中傳遞的是String類型,在這里通過map轉換后,變SingleOutputStreamOperator<UrlInfo> 類型,否則就是SingleOutputStreamOperator<String> 。
map方法不允許缺少數據,也就是原來多少條數據,處理后依然是多少條數據,只是用來做轉換。
flatmap
flatmap,也就是將嵌套集合轉換並平鋪成非嵌套集合。看個例子,還是用上面的kafka datasource:
// 構造一個嵌套的數據 SingleOutputStreamOperator<List<UrlInfo>> listDataStreaamSource = dataStreamSource .map(urlInfo -> { List<UrlInfo> list = Lists.newArrayList(); list.add(urlInfo); UrlInfo urlInfo1 = new UrlInfo(); urlInfo1.setUrl(urlInfo.getUrl() + "-copy"); urlInfo1.setHash(DigestUtils.md5Hex(urlInfo1.getUrl())); list.add(urlInfo1); return list; }).returns(new ListTypeInfo(UrlInfo.class)); listDataStreaamSource.addSink(new PrintSinkFunction<>());
說明:
1、注意這里的returns方法,如果不指定,會在運行時報錯
/*I think the short description of the error message is quite good, but let me expand it a bit. In order to execute a program, Flink needs to know the type of the values that are processed because it needs to serialize and deserialize them. Flink's type system is based on TypeInformation which describes a data type. When you specify a function, Flink tries to infer the return type of that function. In case of the FlatMapFunction of your example the type of the objects that are passed to the Collector. Unfortunately, some Lambda functions lose this information due to type erasure such that Flink cannot automatically infer the type. Therefore, you have to explicitly declare the return type. 如果直接上面這樣轉換,因為lambda表達式會丟失部分信息,會報如下異常: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information. */
不過由於返回的是一個List<Ojbect>,不可能直接用 List<Object>.class,沒這種寫法。而flink則
提供了更多選項,這里使用的是
public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo){}
這個構造函數,而ListTypeInfo則是繼承TypeInfomation抽象類的一個List實現。
和上文的KafkaSender一起運行,會有如下結果:
kafkaSender:
2019-01-15 20:21:46.650 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799
2019-01-15 20:21:46.653 [main] INFO myflink.KafkaSender - send msg:{"domain":"so.com","id":0,"url":"http://so.com/1547554906650"}
KafkaUrlSinkJob
[UrlInfo(id=0, url=http://so.com/1547554906650, hash=null), UrlInfo(id=0, url=http://so.com/1547554906650-copy, hash=efb0862d481297743b08126b2cda602e)]
也就是一個UrlInfo 擴展成了 一個List<UrlInfo>
下面看看怎么使用flatmap
... SingleOutputStreamOperator<UrlInfo> flatSource = listDataStreaamSource.flatMap(new FlatMapFunction<List<UrlInfo>, UrlInfo>() { @Override public void flatMap(List<UrlInfo> urlInfos, Collector<UrlInfo> collector) throws Exception { urlInfos.parallelStream().forEach(urlInfo -> collector.collect(urlInfo)); } }); flatSource.addSink(new PrintSinkFunction<>()); ...
當然可以寫成lambda表達式:(注意lambda表達式需要顯式指定return type)
SingleOutputStreamOperator<UrlInfo> flatSource = listDataStreaamSource.flatMap( (FlatMapFunction<List<UrlInfo>, UrlInfo>) (urlInfos, collector) -> urlInfos.parallelStream().forEach(urlInfo -> collector.collect(urlInfo))).returns(UrlInfo.class);
看看打印出來的結果:
2> [UrlInfo(id=0, url=http://so.com/1547554906650, hash=null), UrlInfo(id=0, url=http://so.com/1547554906650-copy, hash=efb0862d481297743b08126b2cda602e)]
1> [UrlInfo(id=0, url=http://so.com/1547554903640, hash=null), UrlInfo(id=0, url=http://so.com/1547554903640-copy, hash=138f79ecc92744a65b03132959da2f73)]
1> UrlInfo(id=0, url=http://so.com/1547554903640-copy, hash=138f79ecc92744a65b03132959da2f73)
1> UrlInfo(id=0, url=http://so.com/1547554903640, hash=null)
2> UrlInfo(id=0, url=http://so.com/1547554906650, hash=null)
2> UrlInfo(id=0, url=http://so.com/1547554906650-copy, hash=efb0862d481297743b08126b2cda602e)
也就是說,flatmap方法最終返回的是一個collector,而這個collector只有一層,當輸入數據有嵌套的情況下,可以將數據平鋪處理。
當然,不只是針對嵌套集合,由於flatmap返回的數據條數並不會做限制,也就可以做一些擴展數據處理的情況,如下:
dataStream.flatMap((FlatMapFunction<String, String>) (value, out) -> { for (String word : value.split(" ")) { out.collect(word); } });
這里就是將string使用空格切割后,組成一個新的dataStream.
filter
顧名思義,filter用於過濾數據,繼續在上面代碼的基礎上寫測試。為了避免干擾,將上面兩個dataSourceStream.addSink注釋掉,添加以下代碼:
// 根據domain字段,過濾數據,只保留BAIDU的domain SingleOutputStreamOperator<UrlInfo> filterSource = flatSource.filter(urlInfo -> { if(StringUtils.equals(UrlInfo.BAIDU,urlInfo.getDomain())){ return true; } return false; }); filterSource.addSink(new PrintSinkFunction<>());
這里排除別的domain數據,只保留BAIDU的數據,運行結果就不貼出來了,驗證了filter的效果
作者:AlanKim
鏈接:https://www.jianshu.com/p/7bbf14996d39
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。