Flink Program Guide (2) -- 綜述 (DataStream API編程指導 -- For Java)


 

 

DataStream API編程指導

文檔翻譯自Flink DataStream API Programming Guide

-----------------------------------------------------------------------

Flink中的DataStream程序是實現在數據流上的transformation(filteringupdating state defining windowsaggregating)的普通程序。創建數據流的來源多種多樣(如消息隊列,socket流,文件等)。程序通過data sink返回結果,如將數據寫入文件,或發送到標准輸出(如命令行終端)。Flink程序可以在多種上下文中運行,如獨立運行或是嵌入在其他程序中執行。程序的執行可以發生在本地JVM,或者在一個擁有許多設備的集群上。

 

有關介紹Flink API基礎概念的文檔,請見basic concepts

 

為了創建你自己的Flink DataStream程序,我們鼓勵你從文檔anatomy of a Flink Program開始,且歡迎你添加自己的transformations。該文檔接下來的部分是額外的operation和進階特性的參考文檔。

一、示例程序

下面的程序是一個完整的流式窗口word count應用,它計算出在web socket的大小為5秒的窗口中的出現各個單詞的數量。你可以復制 & 粘貼代碼並在本地運行。

import org.apache.flink.api.common.functions.FlatMapFunction;
import
org.apache.flink.api.java.tuple.Tuple2;
import
org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.windowing.time.Time;
import
org.apache.flink.util.Collector;

public class WindowWordCount {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
  .
socketTextStream("localhost", 9999)
  .
flatMap(new Splitter())
  .
keyBy(0)
  .
timeWindow(Time.seconds(5))
  .
sum(1);

dataStream.print();

env.execute("Window WordCount");

}

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override

public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {

for (String word: sentence.split(" ")) {

out.collect(new Tuple2<String, Integer>(word, 1));

}

}

}

}

要運行該示例程序,首先從終端運行netcat來開始輸入流

nc -lk 9999

 

僅需要輸入一些單詞,這些將是word count程序的輸入數據。如果你想看到count大於1的結果,在5秒內重復輸入同一個單詞。

 

二、DataStream Transformations

Data transformation會將一或多個DataStream轉換成一個新的DataStream。程序可以將多個transformation結合形成復雜的拓撲結構(topology)。

 

本小節給出了所有可用的transformation的描述。

Transformation

描述

Map

DataStream -> DataStream

獲取一個element並產出一個element。下例是一個將輸入*2map方法:
 

DataStream<Integer> dataStream = //...
dataStream.
map(new MapFunction<Integer, Integer>() {
  @Override
  public Integer
map(Integer value) throws Exception {
    return
2 * value;
  }
});

FlapMap

DataStream -> DataStream

獲取一個element,並產生出01或多個element。下例是一個為句子分詞的flatmap方法

 

dataStream.flatMap(new FlatMapFunction<String, String>() {
  @Override
  public
void flatMap(String value, Collector<String> outthrows Exception {
    for(String word: value.
split(" ")){
    out.
collect(word);
    }
  }
});

Filter

DataStream -> DataStream

在每個獲取的element上運行一個boolean方法,留下那些方法返回trueelement。下例是一個過濾掉0值的filter
 

dataStream.filter(new FilterFunction<Integer>() {
  @Override
  public
boolean filter(Integer value) throws Exception {
    return value !=
0;
  }
});

KeyBy
DataStream -> KeyedStream

將流邏輯分為不相交的分區,每個分區包含的都是具有相同keyelement,該分區方法使用hash分區實現。定義key的方法見於Keys。下例是一個返回KeyedDataStreamtransformation
 

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.
keyBy(0) // Key by the first element of a Tuple

Reduce

KeyedStream -> DataStream

一個在keyed data stream滾動進行的reduce方法。將上一個reduce過的值和當前element結合,產生新的值並發送出。下例是一個創建部分和的reduce方法。
 

keyedStream.reduce(new ReduceFunction<Integer>() {
  @Override
  public Integer
reduce(Integer value1, Integer value2throws Exception {
    return value1 + value2;
  }
});

Fold

KeyedStream -> DataStream

一個在帶有初始值的數據流上滾動進行的fold方法。將上一個fold的值和當前element結合,產生新的值並發送出。下例是一個fold方法,當應用於序列{1, 2, 3, 4, 5}時,它發出序列{"start-1", "start-1-2", "start-1-2-3" …}
 

DataStream<String> result keyedStream.fold("start", new FoldFunction<Integer, String>() {
  @Override
  public String
fold(String current, Integer value) {
    return current +
"-" + value;
  }
});

Aggregations

KeyedStream -> DataStream

在一個keyed DataStream滾動進行聚合的方法。其中,minminBy的區別在於min返回最小值,而minBy返回的是帶有在此域中最小值的elementmaxmaxBy一樣如此)。
 

keyedStream.sum(0);
keyedStream.
sum("key");
keyedStream.
min(0);
keyedStream.
min("key");
keyedStream.
max(0);
keyedStream.
max("key");
keyedStream.
minBy(0);
keyedStream.
minBy("key");
keyedStream.
maxBy(0);
keyedStream.
maxBy("key");

Window

KeyedStream - > WindowedStream

Window可以定義在已經分區的KeyedStream上。窗口將根據一些特征(如最近5秒到達的數據)將數據按其各自的key集合在一起。有關窗口的完整描述見於windows
 

// Last 5 seconds of data

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));

WindowAll

DataStream -> AllWindowedStream

Window可以定義在普通的DataStream上。窗口將根據一些特征(如最近5秒到達的數據)將所有Stream事件集合在一起。有關窗口的完整描述見於windows
警告:該transformation在很多情況下都不是並行化的,所有數據將被收集到一個運行windowAll Operator的任務上。

 

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

Window Apply

WindowedStream -> DataStream

AllWindowedStream -> DataStream

將一個一般函數應用到window整體上去,下面是一個人工計算window中所有element的總和的應用。
注意:如果你正在使用一個windowAlltransformation,你需要使用AllWindowFunction來代替下例中的參數。
 

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public
void apply (Tuple tuple,
  Window window,
  Iterable<Tuple2<String, Integer>> values,
  Collector<Integer> out) throws Exception {
    
int sum = 0;
    for (value t: values) {
      sum += t.
f1;
    }
    out.
collect (new Integer(sum));
  }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.
apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
public
void apply (Window window,
  Iterable<Tuple2<String, Integer>> values,
  Collector<Integer> out) throws Exception {
    
int sum = 0;
    for (value t: values) {
      sum += t.
f1;
    }
    out.
collect (new Integer(sum));
  }
});

Window Reduce

WindowedStream -> DataStream

對窗口應用一個功能性reduce方法並返回reduce的結果
 

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() {
  public Tuple2<String, Integer>
reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
  return new Tuple2<String,Integer>(value1.
f0, value1.f1 + value2.f1);
  }
};

Window Fold

Windowed Stream -> DataStream

對窗口應用一個功能性fold方法。下例代碼在應用到序列(1, 2, 3, 4, 5)時,它將該序列fold成為字符串"start-1-2-3-4-5"
 

windowedStream.fold("start-", new FoldFunction<Integer, String>() {
  public String
fold(String current, Integer value) {
    return current +
"-" + value;
  }
};

Aggregations on windows

WindowedStream -> DataStream

對窗口中的內容聚合。其中,minminBy的區別在於min返回最小值,而minBy返回的是帶有在此域中最小值的elementmaxmaxBy一樣如此)。
 

windowedStream.sum(0);
windowedStream.
sum("key");
windowedStream.
min(0);
windowedStream.
min("key");
windowedStream.
max(0);
windowedStream.
max("key");
windowedStream.
minBy(0);
windowedStream.
minBy("key");
windowedStream.
maxBy(0);
windowedStream.
maxBy("key");

Union

DataStream* -> DataStream

2個或多個data stream合並創建出一個新的包含所有streamelementstream。注意:如果你對一個data stream自己進行union操作,則在返回的結果中,每個element都會出現2個。
 

dataStream.union(otherStream1, otherStream2, ...);

Window Join

DataStream, DataStream -> DataStream

在給定key和普通window中,將2DataStream進行Join操作

 

dataStream.join(otherStream)
.
where(0).equalTo(1)
.
window(TumblingEventTimeWindows.of(Time.seconds(3)))
.
apply (new JoinFunction () {...});

Window CoGroup

DataStream, DataStream -> DataStream

在給定key和普通window中,對2DataStream進行CoGroup操作。
 

dataStream.coGroup(otherStream)
.
where(0).equalTo(1)
.
window(TumblingEventTimeWindows.of(Time.seconds(3)))
.
apply (new CoGroupFunction () {...});

Connect

DataStream, DataStream -> ConnectedStreams

在保留兩個DataStream的類型的情況下,將二者"連接"起來。Connect使我們可以共享兩個Stream的狀態
 

DataStream<Integer> someStream = //...
DataStream<String> otherStream =
//...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

CoMap, CoFlatMap

ConnectedStreams -> DataStream

該操作類似於mapflatMap針對連接的Data Stream版本。Sd
 

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
  @Override
  public Boolean
map1(Integer value) {
    return true;
  }

  @Override
  public Boolean
map2(String value) {
    return false;
  }
});

 

connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

  @Override
  public
void flatMap1(Integer value, Collector<String> out) {
    out.
collect(value.toString());
  }

  @Override
  public
void flatMap2(String value, Collector<String> out) {
    for (String word: value.
split(" ")) {
      out.
collect(word);
    }
  }
});

Split

DataStream -> SplitStream

根據某些標准將Stream分割成2個或更多的stream
 

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
  @Override
  public Iterable<String>
select(Integer value) {
    List<String> output = new ArrayList<String>();
    if (value %
2 == 0) {
      output.
add("even");
    }
    else {
      output.
add("odd");
    }
    return output;
  }
});

Select

SplitStream -> DataStream

SplitStream中選擇1個或多個stream

 

SplitStream<Integer> split;
DataStream<Integer> even = split.
select("even");
DataStream<Integer> odd = split.
select("odd");
DataStream<Integer> all = split.
select("even","odd");

Iterate

DataStream -> IterativeStream -> DataStream

通過將一個Operator的輸出重定向到前面的某個Operator的方法,在數據流圖中創建一個反饋循環。這在定義持續更新模型的算法時十分有用。下面的例子從一個Stream開始,並持續應用迭代體(Iteration body)。大於0element被送回到反饋通道,而其他的element則被轉發到下游。相關完整描述請見Iterations
 

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.
map (/*do something*/);
DataStream<Long> feedback = iterationBody.
filter(new FilterFunction<Long>(){
  @Override
  public
boolean filter(Integer value) throws Exception {
    return value >
0;
  }
});
iteration.
closeWith(feedback);
DataStream<Long> output = iterationBody.
filter(new FilterFunction<Long>(){
  @Override
  public
boolean filter(Integer value) throws Exception {
    return value <=
0;
  }
});

Extract Timestamps

DataStream -> DataStream

通過從數據中抽取時間戳來使得通過使用事件時間語義的窗口可以工作。詳情見於Event Time
 

stream.assignTimestamps (new TimeStampExtractor() {...});

 

接下來的Transformation是對Tuple類型的data stream可用的Transformation

Transformation

描述

Project

DataStream -> DataStream

tuple中選擇出域的子集而產生新的DataStream
 

DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.
project(2,0);

 

物理級分割(Physical Partitioning

如果需要,Flink同樣提供了在進行一次transformation后針對精確stream分割的低層次的控制(low-level control),它們通過以下幾個方法實現。

 

Transformations

描述

Custom partitioning

DataStream -> DataStream

使用一個用戶自定義的Partitioner來對每個element選擇目標任務sd
 

dataStream.partitionCustom(partitioner, "someKey");
dataStream.
partitionCustom(partitioner, 0);

Random partitioning

DataStream -> DataStream

根據均勻分布來隨機分割element
 

dataStream.shuffle();

Rebalancing(輪詢分割)

DataStream -> DataStream

輪詢分割element,創建相同負荷的分割。對數據變形(data skew)時的性能優化十分有用s
 

dataStream.rebalance();

Rescaling

DataStream -> DataStream

element輪詢分割到下游Operator子集中去。這在你想流水線並行時十分有用,例如,需要從每個並行的source實例中將數據fan out到一個有着一些mapper來分發負載,但是又不想要函數rebalance()那樣引起的完全rebalance的效果時。這就需要僅在本地傳輸數據,而不是需要從網絡傳輸,這需要依賴其他諸如TaskManager的任務槽數量等等configuration值。
上游Operation發送element的下游Operation子集同時依賴於上游和下游兩方Operation的並行度。例如,若上游Operation的並行度為2,下游Operation並行度為4,則1個上游Operation將會把它的element分發給2個下游Operation。另一方面,若下游並行度為2而上游並行度為4,則2個上游Operation將會把它們的element分發給1個下游Operation,而另外兩個上游Operation則分發給另一個下游Operation
當一個或是多個上下游Operation的並行度不是倍數關系時,下游的Operation將擁有不同的從上游獲得的輸入的數量。
下圖是上面例子的連接模式圖:
 

dataStream.rescale();

Broadcasting

DataStream -> DataStream

element廣播到每一個分割中去
 

dataStream.broadcast();

 

鏈接任務以及資源組(Task chaining & resource groups

將兩個transformation鏈接起來意味着將它們部署在一起(co-locating),共享同一個線程來獲得更好的性能。Flink默認地盡可能地鏈接Operator(如兩個連續的map transformation)。如有需要,API還給出了細粒度的鏈接控制:

 

使用StreamExecutionEnvironment.disableOperatorChaining()來關閉整個Job的鏈接操作。下面表格中的方法則是更加細粒度的控制函數,注意,由於這些函數引用的是前一個transformation,所以它們僅僅在一個DataStreamtransformation后使用才是正確的,例如someStream.map( … ).startNewChain()是正確的,而someStream.startNewChain()錯誤的。

 

一個資源組就是Flink中的一個任務槽,如有需要,你可以人工孤立某個Operator到一個獨立的任務槽中。

Transformation

描述

startNewChain()

以當前Operator起點,開始一個新的鏈接。在下例中,兩個mapper將會被鏈接而filter則不會與第一個mapper鏈接
 

someStream.filter(...).map(...).startNewChain().map(...);

disableChaining()

下例中,將不會鏈接mapOperator
 

someStream.map(...).disableChaining();

slotSharingGroup()

設置一個Operation的共享任務槽的分組。Flink將會把同一個任務槽共享組的Operation放到同一個任務槽中,而不在同一個任務槽共享組的Operation放到其他任務槽中。這可以用來孤立任務槽。如果所有的輸入Operation都在同一個任務槽共享組中,則該任務槽共享組會繼承下來。任務槽共享組的默認名為"default"Operation可以通過調用slotSharingGroup("default")來定義其名稱。
 

someStream.filter(...).slotSharingGroup("name");

 

三、數據源

數據源可以通過StreamExecutionEnvironment.addSource(sourceFunction)來創建數據源。你可以使用Flink提供的source方法,也可以通過實現SourceFunction來編寫自定義的非並行數據源,也可以通過實現ParallelSourceFunction接口或繼承RichParallelSourceFunction來編寫自定義並行數據源。

以下是幾個預定義的數據流源,可以通過StreamExecutionEnvironment來訪問:

1.    基於文件的:

·        readTextFile(path) / TextInputFormat - 以行讀取方式讀文件並返回字符串

·        readFile(path) / 任意輸入格式 - 按用輸入格式的描述讀取文件

·        readFileStream - 創建一個stream,在文件有改動時追加element

2.    基於Socket的:

·        socketTextStream - socket讀取,element可以通過分割符來分開

3.    基於Collection的:

·        fromCollection(Collection) - Java.util.Collection創建一個數據流。collection中所有的element都必須是同一類型的。

·        fromCollection(Iterator, Class) - 從一個迭代器中創建一個數據流。class參數明確了迭代器返回的element的類型。

·        fromElement(T …) - 從一個給定的對象序列創建一個數據流。所有對象都必須是同一類型的。

·        fromParallelCollection(SplittableIterator, Class) - 從一個迭代器中創建一個並行數據流。class參數明確了迭代器返回的element的類型。

·        generateSequence(from, to) - 從一個給定區間中生成一個並行數字序列。

4.    自定義:

·        addSource - 附上一個新的source方法。例如,通過調用addSource(new FlinkKafkaConsumer08<>(…))來從Apache Kafka讀取數據,更多信息見於connector

 

四、Data Sink

Data Sink消耗DataStream並將它們轉發到文件、socket、外部系統或打印它們。Flink自帶了許多內置的輸出格式,封裝為DataStreamoperation中:

·        writeAsText() / TextOutputFormat - 以行字符串的方式寫文件,字符串通過調用每個elementtoString()方法獲得。

·        writeAsCsv(…) / CsvOutputFormat - 以逗號分隔的值來講Tuple寫入文件,行和域的分隔符是可以配置的。每個域的值是通過調用objecttoString()方法獲得的。

·        print() / printToErr() - 將每個elementtoString()值打印在標准輸出 / 標准錯誤流中。可以提供一個前綴(msg)作為輸出的前綴,使得在不同print的調用可以互相區分。如果並行度大於1,輸出也會以task的標識符(identifier)為產生的輸出的前綴。

·        writeUsingOutputFormat() / FileOutputFormat - 自定義文件輸出所用的方法和基類,支持自定義objectbyte的轉換。

·        writeToSocket - 依據SerializationSchemaelement寫到socket中。

·        addSink - 調用自定義sink方法,Flink自帶連接到其他系統的connector(如Apache Kafka),這些connector都以sink方法的形式實現。

 

注意DataStreamwrite*()函數主要用於debug,它們不參與Flink的檢查點,這意味着這些方法通常處於至少一次(at-least-once的執行語義下。flush到目標系統的數據依賴於OutputFormat的實現,這意味着不是所有發送到OutputFormatelement都會立即出現在目標系統中,此外,在失效的情況下,這些數據很可能會丟失。

 

故為了可靠性以及將stream“恰好一次(exact once地傳入文件系統,我們應當使用flink-connector-filesystem。此外,通過實現.addSink(…)的自定義內容會參加Flink的檢查點機制,故會保證恰好一次的執行語義。

 

五、迭代(Iterations

迭代流程序實現了一個階段方法並將之嵌入到一個IterativeStream中。作為一個可能永遠不會結束的程序,它沒有最大迭代數,反之,你需要使用splitfiltertransformation來明確流的哪一部分會被反饋到迭代中,哪一部分則繼續轉發到下游。這里,我們使用filter作為例子,我們定義IterativeStream

IterativeStream<Integer> iteration = input.iterate();

然后,我們定義在循環中將要進行的邏輯處理,我們通過一系列transformation來實現(這里用了一個簡單的map transformation):

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

 

我們可以調用IterativeStreamcloseWith(feedbackStream)函數來關閉一個迭代並定義迭代尾。傳遞給closeWith方法的DataStream將會反饋回迭代頭。分割出用來反饋的stream的部分和向前傳播的stream部分通常的方法便是使用filter來進行分割。這些filter可以定義諸如"termination"邏輯,即element將會傳播到下游,而不是被反饋回去。

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.
filter(/* some other part of the stream */);

 

默認地,反饋的那部分流將會自動設置為迭代頭的輸入,要想重載該行為,用戶需要設置closeWith函數中的一個boolean參數。例如,下面是一個持續將整數序列中的數字減1知道它們變為0的程序:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long
map(Long value) throws Exception {
    return value -
1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public
boolean filter(Long value) throws Exception {
    return (value >
0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public
boolean filter(Long value) throws Exception {
    return (value <=
0);
  }
});

 

六、執行參數

StreamExecutionEnvironment包含ExecutionConfig,它可以使用戶設置job的確切運行時配置值。

請參考execution configuration來查看參數的解釋。特別的,以下這些參數僅適用於DataStream API

enableTimestamps() / disableTimestamps():在每一個source發出的事件上附加上一個時間戳。函數areTimestampsEnabled()可以返回該狀態的當前值。

setAutoWatermarkInterval(long milliseconds):設置自動水印發布(watermark emission)區間。你可以通過調用函數getAutoWatermarkInterval()來獲取當前值。

 

6.1 容錯

文檔Fault Tolerance Documentation描述了打開並配置Flink的檢查點機制的選項和參數

 

6.2 控制執行時間

默認的,element在網絡傳輸時不是一個個單獨傳輸的(這會導致不必要的網絡流量),而是緩存后傳輸。緩存(是在設備間傳輸的實際單位)的大小可以在Flink的配置文件中設置。盡管該方法有益於優化吞吐量,他會在stream到達不夠快時導致執行時間方面的問題。為了控制吞吐量和執行時間,你可以在執行環境(或獨立的Operator)中調用env.setBufferTimeout(timeoutMillis)來設置等待裝滿buffer的最大等待時間,在這個時間過后,不管buffer是否已滿,它都會自動發出。該默認超時時間是100ms。下例是設置API的用法:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.
setBufferTimeout(timeoutMillis);

env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

 

要最大化吞吐量,設置setBufferTimeout(-1)來去除超時時間,則buffer僅在它滿后才會被flush。要最小化執行時間,設置timeout為一個接近0的數字(如5ms10ms)。應當避免設置Timeout0,因為它會造成嚴重的性能下降。

 

七、Debugging

在分布式集群上運行流程序之前,確保算法正確執行很重要。因此,實現數據分析程序通常需要遞增的檢查結果、debug、優化的過程。

Flink提供了可以顯著簡化數據分析程序的開發過程的特性,即可以在IDE中本地進行debug、注入測試數據、以及結果數據的收集等。本節對如何簡化Flink程序開發提出幾點建議。

 

7.1 本地執行環境

LocalStreamEnvironment在創建它的同一個JVM進程下創建Flink系統。如果你從IDE中啟動一個LocalEnvironment,你可以在代碼中設置斷點來簡單地debug你的程序。下例為LocalEnvironment是如何創建並使用的:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();

 

7.2 Collection數據源

Flink提供基於Java collection的特殊數據源來方便測試。一旦程序測試之后,sourcesink可以簡單地替代為對外部系統的讀取/寫出的sourcesinkCollection數據源使用方法如下:

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.
fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.
fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.
fromCollection(longIt, Long.class);

 

注意:當前Collection數據源需要實現Serializable接口的數據類型和迭代器。此外,Collection數據源無法並行執行(並行度=1

 

7.3 迭代器Data Sink

Flink同樣提供了一個收集測試和debugDataStream結果的sink,它的使用方式如下:

import org.apache.flink.contrib.streaming.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.
collect(myResult)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM