1.序言
Flink在內部以二進制的格式將數據保存,由於普通的Java對象類型和內部二進制格式不一致,需要一套相互轉換機制來進行序列化和反序列化。
2.DataStream類型系統
2.1 物理類型
Flink支持的物理類型如下圖所示:
分為基礎類型、數組類型、復合類型、輔助類型、泛型和其他類型,如果用戶需要自定義類型的話,需要注冊該類型並自己實現序列化和反序列化的方法。
對於沒有提供的自定義類型,Flink為了程序正常運行,會交給Kryo進行序列化,缺點是序列化和反序列化效率較低。
2.2 邏輯類型
邏輯類型是物理類型的描述,Flink運行時會根據邏輯類型進行數據的序列化和反序列化。
在Flink中使用TypeInformation來描述邏輯類型,該類是一個抽象類,所有邏輯類型繼承該類,分類如下圖所示:
在序列化過程中,所有邏輯類型都必須實現createSerializer(ExecutionConfig config)方法來創建序列化器。
2.3 類型推斷
開發者使用物理類型,Flink運行時使用邏輯類型,所以需要從物理類型轉換為邏輯類型,Java使用反射機制獲取Function的輸入輸出。
2.3.1 類型提取時機
在使用DataStream接口的時候,會觸發類型的提取,如下面map()方法所示:
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
// 提取類型
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);
return map(mapper, outType);
}
2.3.2 自動類型推斷
Flink會首先進行類型推斷,如果用盡各種手段都無法推測出泛型信息時,用戶需要傳入一個類型提示TypeHint,來獲取泛型的類型信息,如下:
TypeInformation.of(new TypeHint<Tuple2<Integer,String>>(){})
2.4 顯式聲明
一般情況下,可以通過TypeInformation.of()方法來顯式創建一個類型信息的對象,如下:
PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);
2.5 存在問題
2.5.1 類型提取
由於泛型的類型擦除導致類型提取不能總是有效,所以有時候需要自己手動指定。
2.5.2 使用Flink自帶的Kryo
使用org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer而不要使用com.esotericsoftware.kryo.serializers.JavaSerializer,防止和Flink不兼容。
3.SQL類型系統
Flink1.9之后引入了新的SQL類型系統,解決了DataStream在SQL中的兼容性、精度、類型等問題。
Flink SQL中使用DataType中的LogicalType類型系統來描述類型信息,在Flink SQL執行時,最終還是要轉換為TypeInformation。
Row表示表中的一行數據或者一條記錄,在1.9版本之前,Flink SQL使用org.apache.flink.types.Row,在1.9版本之后,使用org.apache.flink.table.dataformat.BaseRow及其子類,下面主要介紹Blink Row。
3.1 Blink Row
Blink Row分為列式存儲和行式存儲,結構如下所示:
區別如下:
- BinaryRow:表示二進制形式存儲,由定長部分和不定長部分組成,定長部分在一個MemorySegment中
- NestedRow:同上,但是定長部分可以在不同MemorySegment中
- UpdatableRow:保存所有字段數據,更新數據的時候使用一個數組記錄被修改字段的最新值,讀取數據的時候去判斷是否被更新過再讀取
- ObjectArrayRow:對象存儲,多對象的序列化和反序列化過程,成本比二進制高,兩個實現類,GenericRow表示原始類型,BoxedWrapperRow表示對象類型
- JoinedRow:Join過程中兩行數據的邏輯結構
- ColumnarRow:二進制列式存儲,不同於前面都是行式存儲