DataStream API介紹和示例
Flink程序運行流程
1. 獲取執行環境
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
2. 加載創建初始化數據
readTextFile()
addSource
..
3. 對數據在transformation operator
map
flatMap
filter
..
4. 指定計算結果的輸出位置 sink
print()
writeAdText(String path)
addSink
..
5. 觸發程序執行 execute
env.execute()
在sink是print時,不需要顯示execute,否則會報錯。因為在print方法里已經默認調用了execute。
StreamExecutionEnvironment
StreamExecutionEnvironment 作為程序入口context,有兩類:LocalStreamEnvironment(本地環境) 和RemoteStreamEnvironment(遠程環境)。
ExecutionConfig、CheckpointConfig等配置均在這里初始化。另外,這里也能設置線程數,檢查點周期,以及檢查點模式。還有狀態后端序列化類型以及注冊Type等。
- 如果集群是standalone模式,則StreamExecutionEnvironment.getExecutionEnvironment() 相當於StreamExecutionEnvironment.createLocalEnvironment()
DataStream Source
基於文件的
- readTextFile(String path) charsetName 默認用 UTF-8
- readTextFile(String path, String charsetName):文本文件,格式為 TextInputFormat,返回 BasicTypeInfo.STRING_TYPE_INFO ,TextInputFormat對象調用 setCharsetName(charsetName) 設置字符 ,然后底層再調用 readFile 方法。
- readFile(FileInputFormat
inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation typeInformation):根據給定格式和路徑讀取文件,根據watchType(FileProcessingMode.PROCESS_ONCE:處理一次路徑文件后退出,FileProcessingMode.PROCESS_CONTINUOUSLY:檢測給定路徑的新數據,此時若舊文件發生修改也會重讀,不符合exactly-once),interval 掃描路徑的周期。調用 createFileInput ,createFileInput 調用 addSource 。
Socket流
- socketTextStream(hostname , port) // 主機,端口號,字段分隔符 delimiter 默認為 \n
- socketTextStream(hostname , port , delimiter) // maxRetry 默認為零
- socketTextStream(hostname, port, delimiter, maxRetry )
maxRetry: 當socket端掛掉是,程序等待的最大重試時間。每秒都會重試連接,為0即停止程序...。利用 SocketTextStreamFunction 生成 sourceFunction對象,調用 addSource 生成DataStreamSource
基於數據集的
都是通過本身的SourceFunction對象調用addSource
- fromCollection(Iterator, class)
- fromCollection(Iterator, TypeInformation)
- fromElements(T...)
- fromParallelCollection
Customer Source 自定義source
- addSource(sourceFunction ) // sourceName 為默認值
- addSource(sourceFunction , sourceName) // typeInfo 為 null
- addSource(sourceFunction , typeInformation) // SourceName 有默認值
- addSource(sourceFunction , sourceName , typeInformation)
自定義source代碼示例
package source;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* RichSourceFunction 實現了SourceFunction接口,只做了序列化
* 實現接口SourceFunction或者繼承 RichSourceFunction 需要申明返回的數據類型,不然會報錯:
* Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
* The types of the interface org.apache.flink.streaming.api.functions.source.SourceFunction could not be inferred.
* Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point
*/
public class MyDataSource extends RichSourceFunction<Integer> {
private static final Logger LOG = LoggerFactory.getLogger(MyDataSource.class);
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Integer> ctx) throws Exception{
while (isRunning){
Thread.sleep(300);
int rnd = (int) (Math.random() * 10);
LOG.info("emit data:"+rnd);
ctx.collect( rnd );
}
}
@Override
public void cancel() {
isRunning = false;
}
}
DataStream Transformations
Map [DataStream -> DataStream]
對數據集內每條數據都進行相同的規則處理,常用來做清洗和轉換數據格式等
DataStream<Tuple2<String, Integer>> windowCount
.map(new MapFunction<Tuple2<String,Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
})
FlatMap [DataStream -> DataStream]
將數據集進行打平,即按照邏輯合並在一個或多個數據集里面
DataStream<Tuple2<String, Integer>> windowCount = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : value.split("\\s")) {
collector.collect(Tuple2.of(word, 1));
}
}
})
Filter [DataStream -> DataStream]
過濾數據,符合要求的數據返回 true,不符合要求的返回 false
text
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.contains("h");
}
})
KeyBy [DataStream -> KeyedStream]
在數據集中進行 Partition操作,將相同的key值的數據放到相同的分區中,返回 keyedStream。
指定key時可以按位置指定,也可以按名稱指定(此時需要pojo類、case class等明確了字段位置的)
注意以下類型不能成為key:
- POJO類型但是不覆蓋 hashCode() 方法並依賴於Object.hashCode() 實現
- 任何類型的數組
Reduce [KeyedStream -> DataStream]
定義聚合邏輯,對數據進行聚合處理,其聚合邏輯要求滿足運算結合律和交換律。當前元素的值和最后一個Reduce的值進行組合並返回出新的Reduce的值。
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception {
return Tuple2.of(stringIntegerTuple2.f0, stringIntegerTuple2.f1+t1.f1);
}
})
Aggregate [keyedStream -> DataStream]
聚合算子,將Reduce算子中的函數進行了封裝。封裝的操作包括 sum、min、minBy、max、maxBy等
Fold [keyedStream -> DataStream]
將數據進行滾動折疊,可指定開始值。未來將取消,全部用Aggregate替代
Union
合並操作,要求兩個 DataStream 數據格式一樣
Connect
不要求格式一樣,類似拼接格式操作,返回 ConnectedStreams。
比如 (String,Int) connect (Int) 結果: ((String, Int), Int)
ConnectedStreams不能直接print,需要使用CoMapFunction 或CoFlatMapFunction分別處理DataStrea,處理后返回的數據類型必須保持一致。
Split [DataStream -> SplitStream]
Union算子的逆向實現
Select [SplitStream -> DataStream]
Select是splitStream的方法,split 只是進行標記,並未進行切分。select切分數據集。
SplitStream<String> split = text.split(new OutputSelector<String>() {
// 切分數據的時候給每部分數據打上標記
@Override
public Iterable<String> select(String value) {
ArrayList<String> strings = new ArrayList<>();
if (value.contains("h"))
strings.add("hadoop");
else
strings.add("noHadoop");
return strings;
}
});
// 打印有 hadoop 標簽的數據
split.select("hadoop").print();
// 打印有 noHadoop 標簽的數據
split.select("noHadoop")
.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return s.toUpperCase();
}
})
.print();
Partition類 transformation
- shuffle: 隨機分配,分區相對均衡,容易失去原有數據分區結構
- rebalance: 盡可能保證每個分區的數據平衡,多用於數據傾斜
- rescale: 待定