DataStream API編程指導
文檔翻譯自Flink DataStream API Programming Guide
-----------------------------------------------------------------------
Flink中的DataStream程序是實現在數據流上的transformation(如filtering,updating state, defining windows,aggregating)的普通程序。創建數據流的來源多種多樣(如消息隊列,socket流,文件等)。程序通過data sink返回結果,如將數據寫入文件,或發送到標准輸出(如命令行終端)。Flink程序可以在多種上下文中運行,如獨立運行或是嵌入在其他程序中執行。程序的執行可以發生在本地JVM,或者在一個擁有許多設備的集群上。
有關介紹Flink API基礎概念的文檔,請見basic concepts
為了創建你自己的Flink DataStream程序,我們鼓勵你從文檔anatomy of a Flink Program開始,且歡迎你添加自己的transformations。該文檔接下來的部分是額外的operation和進階特性的參考文檔。
一、示例程序
下面的程序是一個完整的流式窗口word count應用,它計算出在web socket的大小為5秒的窗口中的出現各個單詞的數量。你可以復制 & 粘貼代碼並在本地運行。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
要運行該示例程序,首先從終端運行netcat來開始輸入流
nc -lk 9999
僅需要輸入一些單詞,這些將是word count程序的輸入數據。如果你想看到count大於1的結果,在5秒內重復輸入同一個單詞。
二、DataStream Transformations
Data transformation會將一或多個DataStream轉換成一個新的DataStream。程序可以將多個transformation結合形成復雜的拓撲結構(topology)。
本小節給出了所有可用的transformation的描述。
| Transformation |
描述 |
| Map DataStream -> DataStream |
獲取一個element並產出一個element。下例是一個將輸入*2的map方法: DataStream<Integer> dataStream = //... |
| FlapMap DataStream -> DataStream |
獲取一個element,並產生出0、1或多個element。下例是一個為句子分詞的flatmap方法
dataStream.flatMap(new FlatMapFunction<String, String>() { |
| Filter DataStream -> DataStream |
在每個獲取的element上運行一個boolean方法,留下那些方法返回true的element。下例是一個過濾掉0值的filter dataStream.filter(new FilterFunction<Integer>() { |
| KeyBy |
將流邏輯分為不相交的分區,每個分區包含的都是具有相同key的element,該分區方法使用hash分區實現。定義key的方法見於Keys。下例是一個返回KeyedDataStream的transformation。 dataStream.keyBy("someKey") // Key by field "someKey" |
| Reduce KeyedStream -> DataStream |
一個在keyed data stream上“滾動”進行的reduce方法。將上一個reduce過的值和當前element結合,產生新的值並發送出。下例是一個創建部分和的reduce方法。 keyedStream.reduce(new ReduceFunction<Integer>() { |
| Fold KeyedStream -> DataStream |
一個在帶有初始值的數據流上“滾動”進行的fold方法。將上一個fold的值和當前element結合,產生新的值並發送出。下例是一個fold方法,當應用於序列{1, 2, 3, 4, 5}時,它發出序列{"start-1", "start-1-2", "start-1-2-3" …}。 DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() { |
| Aggregations KeyedStream -> DataStream |
在一個keyed DataStream上“滾動”進行聚合的方法。其中,min和minBy的區別在於min返回最小值,而minBy返回的是帶有在此域中最小值的element(max和maxBy一樣如此)。 keyedStream.sum(0); |
| Window KeyedStream - > WindowedStream |
Window可以定義在已經分區的KeyedStream上。窗口將根據一些特征(如最近5秒到達的數據)將數據按其各自的key集合在一起。有關窗口的完整描述見於windows // Last 5 seconds of data dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); |
| WindowAll DataStream -> AllWindowedStream |
Window可以定義在普通的DataStream上。窗口將根據一些特征(如最近5秒到達的數據)將所有Stream事件集合在一起。有關窗口的完整描述見於windows
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data |
| Window Apply WindowedStream -> DataStream AllWindowedStream -> DataStream |
將一個一般函數應用到window整體上去,下面是一個人工計算window中所有element的總和的應用。 windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { // applying an AllWindowFunction on non-keyed window stream |
| Window Reduce WindowedStream -> DataStream |
對窗口應用一個功能性reduce方法並返回reduce的結果 windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() { |
| Window Fold Windowed Stream -> DataStream |
對窗口應用一個功能性fold方法。下例代碼在應用到序列(1, 2, 3, 4, 5)時,它將該序列fold成為字符串"start-1-2-3-4-5" windowedStream.fold("start-", new FoldFunction<Integer, String>() { |
| Aggregations on windows WindowedStream -> DataStream |
對窗口中的內容聚合。其中,min和minBy的區別在於min返回最小值,而minBy返回的是帶有在此域中最小值的element(max和maxBy一樣如此)。 windowedStream.sum(0); |
| Union DataStream* -> DataStream |
將2個或多個data stream合並創建出一個新的包含所有stream的element的stream。注意:如果你對一個data stream自己進行union操作,則在返回的結果中,每個element都會出現2個。 dataStream.union(otherStream1, otherStream2, ...); |
| Window Join DataStream, DataStream -> DataStream |
在給定key和普通window中,將2個DataStream進行Join操作
dataStream.join(otherStream) |
| Window CoGroup DataStream, DataStream -> DataStream |
在給定key和普通window中,對2個DataStream進行CoGroup操作。 dataStream.coGroup(otherStream) |
| Connect DataStream, DataStream -> ConnectedStreams |
在保留兩個DataStream的類型的情況下,將二者"連接"起來。Connect使我們可以共享兩個Stream的狀態 DataStream<Integer> someStream = //... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream); |
| CoMap, CoFlatMap ConnectedStreams -> DataStream |
該操作類似於map和flatMap針對連接的Data Stream版本。Sd connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override @Override |
| Split DataStream -> SplitStream |
根據某些標准將Stream分割成2個或更多的stream SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { |
| Select SplitStream -> DataStream |
從SplitStream中選擇1個或多個stream
SplitStream<Integer> split; |
| Iterate DataStream -> IterativeStream -> DataStream |
通過將一個Operator的輸出重定向到前面的某個Operator的方法,在數據流圖中創建一個“反饋”循環。這在定義持續更新模型的算法時十分有用。下面的例子從一個Stream開始,並持續應用迭代體(Iteration body)。大於0的element被送回到反饋通道,而其他的element則被轉發到下游。相關完整描述請見Iterations IterativeStream<Long> iteration = initialStream.iterate(); |
| Extract Timestamps DataStream -> DataStream |
通過從數據中抽取時間戳來使得通過使用事件時間語義的窗口可以工作。詳情見於Event Time。 stream.assignTimestamps (new TimeStampExtractor() {...}); |
接下來的Transformation是對Tuple類型的data stream可用的Transformation:
| Transformation |
描述 |
| Project DataStream -> DataStream |
從tuple中選擇出域的子集而產生新的DataStream DataStream<Tuple3<Integer, Double, String>> in = // [...] |
物理級分割(Physical Partitioning)
如果需要,Flink同樣提供了在進行一次transformation后針對精確stream分割的低層次的控制(low-level control),它們通過以下幾個方法實現。
| Transformations |
描述 |
| Custom partitioning DataStream -> DataStream |
使用一個用戶自定義的Partitioner來對每個element選擇目標任務sd dataStream.partitionCustom(partitioner, "someKey"); |
| Random partitioning DataStream -> DataStream |
根據均勻分布來隨機分割element dataStream.shuffle(); |
| Rebalancing(輪詢分割) DataStream -> DataStream |
輪詢分割element,創建相同負荷的分割。對數據變形(data skew)時的性能優化十分有用s dataStream.rebalance(); |
| Rescaling DataStream -> DataStream |
將element輪詢分割到下游Operator子集中去。這在你想流水線並行時十分有用,例如,需要從每個並行的source實例中將數據fan out到一個有着一些mapper來分發負載,但是又不想要函數rebalance()那樣引起的完全rebalance的效果時。這就需要僅在本地傳輸數據,而不是需要從網絡傳輸,這需要依賴其他諸如TaskManager的任務槽數量等等configuration值。
dataStream.rescale(); |
| Broadcasting DataStream -> DataStream |
將element廣播到每一個分割中去 dataStream.broadcast(); |
鏈接任務以及資源組(Task chaining & resource groups)
將兩個transformation鏈接起來意味着將它們部署在一起(co-locating),共享同一個線程來獲得更好的性能。Flink默認地盡可能地鏈接Operator(如兩個連續的map transformation)。如有需要,API還給出了細粒度的鏈接控制:
使用StreamExecutionEnvironment.disableOperatorChaining()來關閉整個Job的鏈接操作。下面表格中的方法則是更加細粒度的控制函數,注意,由於這些函數引用的是前一個transformation,所以它們僅僅在一個DataStream的transformation后使用才是正確的,例如someStream.map( … ).startNewChain()是正確的,而someStream.startNewChain()是錯誤的。
一個資源組就是Flink中的一個任務槽,如有需要,你可以人工孤立某個Operator到一個獨立的任務槽中。
| Transformation |
描述 |
| startNewChain() |
以當前Operator起點,開始一個新的鏈接。在下例中,兩個mapper將會被鏈接而filter則不會與第一個mapper鏈接 someStream.filter(...).map(...).startNewChain().map(...); |
| disableChaining() |
下例中,將不會鏈接mapOperator。 someStream.map(...).disableChaining(); |
| slotSharingGroup() |
設置一個Operation的共享任務槽的分組。Flink將會把同一個任務槽共享組的Operation放到同一個任務槽中,而不在同一個任務槽共享組的Operation放到其他任務槽中。這可以用來孤立任務槽。如果所有的輸入Operation都在同一個任務槽共享組中,則該任務槽共享組會繼承下來。任務槽共享組的默認名為"default",Operation可以通過調用slotSharingGroup("default")來定義其名稱。 someStream.filter(...).slotSharingGroup("name"); |
三、數據源
數據源可以通過StreamExecutionEnvironment.addSource(sourceFunction)來創建數據源。你可以使用Flink提供的source方法,也可以通過實現SourceFunction來編寫自定義的非並行數據源,也可以通過實現ParallelSourceFunction接口或繼承RichParallelSourceFunction來編寫自定義並行數據源。
以下是幾個預定義的數據流源,可以通過StreamExecutionEnvironment來訪問:
1. 基於文件的:
· readTextFile(path) / TextInputFormat - 以行讀取方式讀文件並返回字符串
· readFile(path) / 任意輸入格式 - 按用輸入格式的描述讀取文件
· readFileStream - 創建一個stream,在文件有改動時追加element
2. 基於Socket的:
· socketTextStream - 從socket讀取,element可以通過分割符來分開
3. 基於Collection的:
· fromCollection(Collection) - 從Java.util.Collection創建一個數據流。collection中所有的element都必須是同一類型的。
· fromCollection(Iterator, Class) - 從一個迭代器中創建一個數據流。class參數明確了迭代器返回的element的類型。
· fromElement(T …) - 從一個給定的對象序列創建一個數據流。所有對象都必須是同一類型的。
· fromParallelCollection(SplittableIterator, Class) - 從一個迭代器中創建一個並行數據流。class參數明確了迭代器返回的element的類型。
· generateSequence(from, to) - 從一個給定區間中生成一個並行數字序列。
4. 自定義:
· addSource - 附上一個新的source方法。例如,通過調用addSource(new FlinkKafkaConsumer08<>(…))來從Apache Kafka讀取數據,更多信息見於connector
四、Data Sink
Data Sink消耗DataStream並將它們轉發到文件、socket、外部系統或打印它們。Flink自帶了許多內置的輸出格式,封裝為DataStream的operation中:
· writeAsText() / TextOutputFormat - 以行字符串的方式寫文件,字符串通過調用每個element的toString()方法獲得。
· writeAsCsv(…) / CsvOutputFormat - 以逗號分隔的值來講Tuple寫入文件,行和域的分隔符是可以配置的。每個域的值是通過調用object的toString()方法獲得的。
· print() / printToErr() - 將每個element的toString()值打印在標准輸出 / 標准錯誤流中。可以提供一個前綴(msg)作為輸出的前綴,使得在不同print的調用可以互相區分。如果並行度大於1,輸出也會以task的標識符(identifier)為產生的輸出的前綴。
· writeUsingOutputFormat() / FileOutputFormat - 自定義文件輸出所用的方法和基類,支持自定義object到byte的轉換。
· writeToSocket - 依據SerializationSchema將element寫到socket中。
· addSink - 調用自定義sink方法,Flink自帶連接到其他系統的connector(如Apache Kafka),這些connector都以sink方法的形式實現。
注意DataStream的write*()函數主要用於debug,它們不參與Flink的檢查點,這意味着這些方法通常處於“至少一次(at-least-once)“的執行語義下。flush到目標系統的數據依賴於OutputFormat的實現,這意味着不是所有發送到OutputFormat的element都會立即出現在目標系統中,此外,在失效的情況下,這些數據很可能會丟失。
故為了可靠性以及將stream“恰好一次(exact once)”地傳入文件系統,我們應當使用flink-connector-filesystem。此外,通過實現“.addSink(…)”的自定義內容會參加Flink的檢查點機制,故會保證“恰好一次”的執行語義。
五、迭代(Iterations)
迭代流程序實現了一個階段方法並將之嵌入到一個IterativeStream中。作為一個可能永遠不會結束的程序,它沒有最大迭代數,反之,你需要使用split或filter的transformation來明確流的哪一部分會被反饋到迭代中,哪一部分則繼續轉發到下游。這里,我們使用filter作為例子,我們定義IterativeStream:
IterativeStream<Integer> iteration = input.iterate();
然后,我們定義在循環中將要進行的邏輯處理,我們通過一系列transformation來實現(這里用了一個簡單的map transformation):
DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);
我們可以調用IterativeStream的closeWith(feedbackStream)函數來關閉一個迭代並定義迭代尾。傳遞給closeWith方法的DataStream將會反饋回迭代頭。分割出用來反饋的stream的部分和向前傳播的stream部分通常的方法便是使用filter來進行分割。這些filter可以定義諸如"termination"邏輯,即element將會傳播到下游,而不是被反饋回去。
iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);
默認地,反饋的那部分流將會自動設置為迭代頭的輸入,要想重載該行為,用戶需要設置closeWith函數中的一個boolean參數。例如,下面是一個持續將整數序列中的數字減1知道它們變為0的程序:
DataStream<Long> someIntegers = env.generateSequence(0, 1000);
IterativeStream<Long> iteration = someIntegers.iterate();
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value - 1 ;
}
});
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value > 0);
}
});
iteration.closeWith(stillGreaterThanZero);
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value <= 0);
}
});
六、執行參數
StreamExecutionEnvironment包含ExecutionConfig,它可以使用戶設置job的確切運行時配置值。
請參考execution configuration來查看參數的解釋。特別的,以下這些參數僅適用於DataStream API:
enableTimestamps() / disableTimestamps():在每一個source發出的事件上附加上一個時間戳。函數areTimestampsEnabled()可以返回該狀態的當前值。
setAutoWatermarkInterval(long milliseconds):設置自動水印發布(watermark emission)區間。你可以通過調用函數getAutoWatermarkInterval()來獲取當前值。
6.1 容錯
文檔Fault Tolerance Documentation描述了打開並配置Flink的檢查點機制的選項和參數
6.2 控制執行時間
默認的,element在網絡傳輸時不是一個個單獨傳輸的(這會導致不必要的網絡流量),而是緩存后傳輸。緩存(是在設備間傳輸的實際單位)的大小可以在Flink的配置文件中設置。盡管該方法有益於優化吞吐量,他會在stream到達不夠快時導致執行時間方面的問題。為了控制吞吐量和執行時間,你可以在執行環境(或獨立的Operator)中調用env.setBufferTimeout(timeoutMillis)來設置等待裝滿buffer的最大等待時間,在這個時間過后,不管buffer是否已滿,它都會自動發出。該默認超時時間是100ms。下例是設置API的用法:
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
要最大化吞吐量,設置setBufferTimeout(-1)來去除超時時間,則buffer僅在它滿后才會被flush。要最小化執行時間,設置timeout為一個接近0的數字(如5ms或10ms)。應當避免設置Timeout為0,因為它會造成嚴重的性能下降。
七、Debugging
在分布式集群上運行流程序之前,確保算法正確執行很重要。因此,實現數據分析程序通常需要遞增的檢查結果、debug、優化的過程。
Flink提供了可以顯著簡化數據分析程序的開發過程的特性,即可以在IDE中本地進行debug、注入測試數據、以及結果數據的收集等。本節對如何簡化Flink程序開發提出幾點建議。
7.1 本地執行環境
LocalStreamEnvironment在創建它的同一個JVM進程下創建Flink系統。如果你從IDE中啟動一個LocalEnvironment,你可以在代碼中設置斷點來簡單地debug你的程序。下例為LocalEnvironment是如何創建並使用的:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String> lines = env.addSource(/* some source */);
// build your program
env.execute();
7.2 Collection數據源
Flink提供基於Java collection的特殊數據源來方便測試。一旦程序測試之后,source和sink可以簡單地替代為對外部系統的讀取/寫出的source和sink。Collection數據源使用方法如下:
// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
注意:當前Collection數據源需要實現Serializable接口的數據類型和迭代器。此外,Collection數據源無法並行執行(並行度=1)
7.3 迭代器Data Sink
Flink同樣提供了一個收集測試和debug的DataStream結果的sink,它的使用方式如下:
import org.apache.flink.contrib.streaming.DataStreamUtils
DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

