flink支持的數據類型
Flink對DataSet和DataStream中可使用的類型加了一些約束。原因是系統可以通過分析這些類型來確定有效的執行策略和選擇不同的序列化方式。有7種不同的數據類型:
1.java Tuple 和 Scala Case類:
2.java POJO
(指那些沒有從任何類集成,也沒有實現任何接口,更沒有被其他框架侵入的java對象
1.是公共類 2.無參構造是公共的 3.所有的屬性是可獲得的 4.自斷必須是flink支持的。Flink會用Avro來序列化任意對象。Flink會分析POJO類型結構獲知POJO字段。POJO類型比一般類型好用。此外,Flink訪問POJO要比一般類型更高效)
3.基本類型
Flink支持java和scala所有的基本數據類型,比如integer,String和Double
4.通用類
Flink支持大多數的java scala類,包含不能序列化字段的類在增加一些限制后也可以支持。遵循java Bean規范的類一般都可以使用
所有不能視為POJO的類Flink都會當做一般類處理。這些數據類型被視作黑箱,其內容是不可見的。通用類使用Kryo進行序列化和反序列化
5.值
通過實現org.apache.flinktypes.Value接口的read和write方法提供自定義代碼來序列化和反序列化,而不是使用通用的序列化框架
Flink預定義的值類型與原生數據類型是一一對應的(例如:ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,Char
Value,BooleanValue)。這些值類型作為原生數據類型的可變變體,他們的值是可以改變的。允許程序重用對象從而緩解GC壓力
6.hadoop Writables
7.特殊類型
scala的Either Option和try
java API有自己Either實現
延伸支持
java 范型的類型擦除機制
Flink累加器的實現
累加器的內置類型(計數)IntCounter,LongCounter和DoubleCounter
Histogram
如何使用累加器
一.在自定義的轉換操作里面創建累加器對象
private IntCounter numLines=new IntCounter();
二.注冊累加器對象,通常在rich function的open()方法中。這里你還需要定義累加器的名字
(繼承RichFlatMapFunction實現open和close方法,只執行一次里面進行注冊)
getRuntimeContext().addAccumlator("num-lines",this.numLines)
三.在operator函數的任何地方使用累加器,包括在open()和close()方法中
this.numLines.add(1);
第四步:結果存儲在JobExecutionResult里:
JobExecutionResult JobExecutionResult = env.execute(“Flink Batch java API Skeleton”)
myJobExecutionResult.getAccumlatorResult("num-lines")