- Flink程序是實現分布式集合轉換的常規程序。集合最初是從源創建的。通過接收器(slink)返回結果,接收器可以將數據寫到某個文件或stdout。Flink可以在各種環境(context)中運行,本地JVM或集群。
1.數據集和數據流
- Flink用特殊的類
DataSet
andDataStream來表示程序中的數據。可以認為他們是可以包含重復數據的不可變數據集合。在DataSet中數據是有限的,而在DataStream中數據是無限的。
- 這些集合不同於java里的集合,他們是不可變的,一旦被創造就不能改動,也不能簡單的抽查里面的元素。
- 最初的集合是通過在Flink程序里添加一個源被創造的,新的集合是使用API方法(如
map
,filter
)通過轉換得到的。
2.剖析一個Flink程序
- 每個程序包含相同的基本部分:
- 獲得一個執行環境(execution environment).
- 加載/創建初始數據。
- 指定轉換這些數據。
- 指定放置計算結果的位置。
- 觸發程序執行。
- StreamExecutionEnvironment是所有Flink程序的基礎。可以通過以下靜態方法獲得:
getExecutionEnvironment() createLocalEnvironment() createRemoteEnvironment(String host, int port, String... jarFiles)
通常只需要使用getExecutionEnvironment()方法,因為這將根據環境做出正確的事:如果你執行你的程序在IDE上或着作為一個普通Java程序,它將創建一個本地環境,將在本地機器上執行程序。如果您從您的程序創建了一個JAR文件,並通過命令行調用它,Flink集群管理者將執行你的main方法並且getExecutionEnvironment()將返回一個在一個集群上執行程序的執行環境。
- 用於指定數據源,執行環境有幾個方法來從文件讀取:你可以逐行閱讀,像CSV文件,或者使用完全自定義數據輸入格式。要讀取一個文本文件的順序,您可以使用:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///path/to/file");
這將給你一個數據流,然后,您可以通過轉換創建新的派生數據流。
- 你可以通過調用DataStream數據轉換方法轉換。例如,一個map轉換看起來像這樣:
DataStream<String> input = ...; DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } });
通過將原始集合中的每個字符串轉換為整數,這將創建一個新的數據流。
- 一旦你有了一個包含你的最終結果的數據流,通過創建一個sink可以把它寫一個外部系統。這些是一些創建一個sink的的方法示例:
writeAsText(String path) print()
- 一旦你指定完整程序,你需要通過調用StreamExecutionEnvironment上的execute()去觸發程序執行。根據ExecutionEnvironment的類型,執行將會被觸發在你的本地機器上或提交程序在集群上執行。這個execute()方法返回一個JobExecutionResult,它包含執行時間和累加結果。
3.延遲計算
- 所有Flink程序都是延遲執行的:當一個程序的main方法被執行時,數據加載和轉換沒有被立刻發生,相反,每個操作被創造,添加到該程序的計划里。當執行明確地被
execute()
觸發時這些操作被真正執行。
4.指定的keys
- 一些轉換(join, coGroup, keyBy, groupBy)要求的集合上定義的一個key。其他的轉換(Reduce, GroupReduce, Aggregate, Windows)允許數據在被應用前通過一個key被分組。
- 一個DataSet被分組:
DataSet<...> input = // [...] DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/);
一個key在DataStream中被指定:
DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*window specification*/);
Flink的數據模型不是基於 鍵值( key-value)對的。keys是虛擬的:他們被定義為實際數據的函數來引導分組操作符。
5.指定轉換函數
- 許多轉換需要用戶自定義函數。
- 實現一個接口:
class MyMapFunction implements MapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } }; data.map(new MyMapFunction());
- 匿名類:
data.map(new MapFunction<String, Integer> () { public Integer map(String value) { return Integer.parseInt(value); } });
- 需要用戶定義函數的所有轉換都可以將rich函數作為參數。相比:
class MyMapFunction implements MapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } };
可以寫成:
class MyMapFunction extends RichMapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } };
像往常一樣傳遞函數到map轉換:
data.map(new MyMapFunction());
rich函數也可以被定義為一個匿名類:
data.map (new RichMapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } });
rich函數除了提供像map一樣的用戶自定義功能外,還提供四個方法:
open
,close
,getRuntimeContext
, andsetRuntimeContext。這些是有用的對於向函數傳遞參數,創建並最終確定本地狀態,訪問廣播變量,和用於訪問運行時信息,如累加器和計數器,和信息迭代。
6.支持的數據類型
- Java Tuples and Scala Case Classes:java元組和Scala case類 元組是包含固定數量的含有各種類型的字段的復合類型。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements( new Tuple2<String, Integer>("hello", 1), new Tuple2<String, Integer>("world", 2)); wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() { @Override public Integer map(Tuple2<String, Integer> value) throws Exception { return value.f1; } }); wordCounts.keyBy(0); // also valid .keyBy("f0")
- java POJOs:Java和Scala類被Flink作為一種特殊的POJO數據類型對待,並且類必須是public的,它必須有一個沒有參數的公共構造函數,所有的字段是公共或必須通過getter和setter函數來訪問,對於一個被叫做foo的字段,getter和setter函數必須命名為
getFoo()
和setFoo()。
public class WordWithCount { public String word; public int count; public WordWithCount() {} public WordWithCount(String word, int count) { this.word = word; this.count = count; } } DataStream<WordWithCount> wordCounts = env.fromElements( new WordWithCount("hello", 1), new WordWithCount("world", 2)); wordCounts.keyBy("word"); // key by field expression "word"
- Primitive Types 基本數據類型 Flink支持java和Scala基本數據類型。
- General class types 一般類類型 Flink支持許多Java和Scala類(API和自定義)。所有不確定為POJO類類型(參見上面的POJO需求)是Flink一般類類型。Flink將這些數據類型視為黑盒,無法訪問其內容。
- Value 值類型通過實現org.apache.flinktypes.Value接口的讀和寫方法為那些操作提供定制代碼。當通用串行化是效率無效率的時候使用值類型是合理的。一個例子是一個數據類型實現稀疏向量元素的數組。知道數組大多是零,一個可以用一個特殊的編碼輸出非零元素,而通用串行化只會寫所有的數組元素。 Flink用預定義的值類型,對應於基本數據類型。 (
ByteValue
,ShortValue
,IntValue
,LongValue
,FloatValue
,DoubleValue
,StringValue
,CharValue
,BooleanValue
)。這些值類型作為可變的基本數據類型的變量,其價值可以更改,允許程序員重用對象和減輕垃圾收集器的壓力。 - Hadoop Writables
- Special Types
7.Accumulators & Counters
- Accumulators有簡單的結構包括添加操作和工作結束后可用的最后累積的結果。
- 最簡單的accumulator是counter:你可以用Accumulator.add(V value)方法使他自己增量。工作結束后Flink將合並所有部分結果並將結果發送給客戶端。Accumulators在調試或你想很快知道關於你的數據更多的時候是有用的。
- Flink有以下內置的Accumulators,每一個都實現了Accumulator接口:IntCounter,LongCounter, and IDoubleCounter。
- 如何使用accumulators: 首先你應該你想使用accumulator在用戶定義的轉換函數處必須創建一個accumulator對象(這里是一個計數器)。
private IntCounter numLines = new IntCounter();
第二,你必須注冊accumulator對象,通常在rich函數的open()方法。在這里你也可以定義名字。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
您現在可以在運算符函數任何地方使用這個accumulator,包括open()和close()方法。
this.numLines.add(1);
總的結果將被存儲在從執行環境的execute()方法返回的JobExecutionResult對象中。
myJobExecutionResult.getAccumulatorResult("num-lines")
每個job中所有的accumulator共享一個命名空間,因此,您可以使用相同的accumulator在你的job的不同運算符函數中。Flink將內部合並(合)並具有相同名稱的accumulator。
- 一般的,accumulators的結果只有在這個job結束之后才是可用的。
- Accumulator<V,R>是最靈活的:它定義了一個類型V用於值的增加,和最終結果類型R用於最終結果。