在使用Java編寫apache-flink程序的時候相信很多新手都遇到下面這樣的異常;
org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(DemoApp.java:29)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:45
函數返回類型由於類型刪除,無法自動確定類型;可使用returns方法或使用函數實現ResultTypeQueryable接口;
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
Collector泛型類型參數丟失。在使用Java泛型時lambda方法無法提供足夠的信息來進行自動進行類型提取。一個簡單的解決方案是使用匿名內部類代替來實現FlatMapFunction接口,不然只能使用類型信息顯式指定類型;
拋出的上面這兩個異常描述的內容其實很明確了,簡單來說就是:在實現FlatMapFunction時使用lambda表達式導致了Collector變量的泛型類型參數丟(由於類型刪除),簡單的解決方案是把lambda表達式換成匿名內部類或者顯式指定類型(使用returns方法或實現ResultTypeQueryable接口);
下面簡單談談Java的類型擦除與flink的顯式指定類型;
Java類型擦除
Java的泛型被很多人詬病稱為“偽泛型”,也是因為類型擦除這個原因,泛型在Java中就是屬於語法糖;
在Java中JVM虛擬機層面並不存在泛型的概念,Java在編譯階段把泛型的類型參數給擦除掉了,在運行階段並沒有泛型的概念;
public class Data<T> {
private T obj;
public T getObj() {
return obj;
}
public void setObj(T obj) {
this.obj = obj;
}
}
如上類,在經過Java編譯成為class文件后其中的類型參數T將被擦除,字段obj變成了Object類型,兩個get、set方法中的T也都換成了Object類型;
泛型實現主要有兩種:
Code sharing:一個原始類的泛型類型只有一份目標代碼。
Code specialization:對每個泛型類型都生成不同的代碼。
Java屬於第一種,C#與C++屬於第二種,兩種實現各有春秋吧,這里不討論;
為了保證Java的多態特性編譯器在進行類型擦除時還可能會生成橋接方法用於保證類型擦除所導致子類與父類方法實現不一致問題;
Flink中的泛型與lambda
stream.flatMap(new FlatMapFunction<Integer, String>() {
@Override
public void flatMap(Integer value, Collector<String> out) throws Exception {
System.out.println(value);
}
});
在Flink中使用各種算子的時候可能會有類似上面面這種用法,上面這種方式使用並沒有什么問題,這里的FlatMapFunction就是一個泛型接口,使用了匿名內部類實現了該接口並傳遞給了flatMap算子;
stream.flatMap((FlatMapFunction<Integer, String>) (value, out) -> {
System.out.println(value);
})
也有的人直接使用lambda表達式實現FlatMapFunction接口傳遞給flatMap算子,但這時候很多新手估計會發現程序運行的時候報錯了,拋出了本文最開始的那兩個異常;
為什么使用匿名內部類就沒問題,而使用lambda表達式就不行報錯了,其實異常信息已經描述很清楚了。這里簡單看看為什么匿名內部類可以,lambda表達式不可以,使用returns方法或實現ResultTypeQueryable接口也可以;
上面介紹了在Java中會對泛型信息進行類型參數擦除,但在這里為啥使用匿名內部類實現FlatMapFunction時卻還是可以獲取得到泛型參數?
其實Java中編譯時的泛型類型擦除並不是把所以泛型相關的信息全部擦干干凈凈,Javac編譯時擦除的只是結構化之外(程序執行流)的信息這部分信息存儲在字節碼的Code屬性中,類、字段、方法的泛型類型參數元數據都會被保留下來,這些存儲在Signature屬性中;可通過反射得到相關的泛型參數信息;
s.flatMap(new FlatMapFunction<String, Integer>() {
@Override
public void flatMap(String value, List<Integer> out) {
System.out.println("stu");
}
});
而lambda表達式實現FlatMapFunction卻獲取不到泛型參數,是的。
匿名內部類會編譯成相關的類字節碼存儲在class文件中,而lambda表達式卻也只是Java的語法糖並不會存在相關的類字節碼,只會在lambda表達式運行時調用invokedynamic指令執行邏輯。lambda表達式丟失了更多的類型信息,也就導致了使用lambda表達式獲取不到泛型類型參數;
s.flatMap((FlatMapFunction<String, Integer>) (value, out) ->
System.out.println("stu"));
Flink中使用lambda后的寫法
其實上面異常信息已經說得非常清楚了,調用returns方法或實現ResultTypeQueryable接口,這里就簡單說這兩種用法;
returns方法
調用該方法的用法也比較簡單,就是返回的Collector需要哪個泛型類型參數你就調用returns方法注冊哪種類型,調用returns方法一定是要在某個算子之后緊接着第一個調用,簡單理解就是未某個算子注冊返回類型;
stream.flatMap((FlatMapFunction<Integer, String>) (value, out) -> {
System.out.println(value);
})
.returns(String.class)
ResultTypeQueryable接口
實現此接口就可以告訴系統此算子的返回值類型,實現了此接口的優先級最高,不會再通過反射去獲取返回值類型。還可以根據類型參數的不同使用不同的返回值類型;實現此接口可定制化程度很高、靈活。Flink kafka相關的連接器中就是用了這種模式。
public class FlatFun implements ResultTypeQueryable<String>, FlatMapFunction<Integer, String> {
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(String.class);
}
@Override
public void flatMap(Integer value, Collector<String> out) {
out.collect(String.valueOf(value));
System.out.println("flatFun");
}
}
stream.flatMap(new FlatFun())
.print();