1.1 Data Source數據源
在實時計算DataStream API中,Source是用來獲取外部數據源的操作,按照獲取數據的方式,可以分為:基於集合的Source、基於Socket網絡端口的Source、基於文件的Source、第三方Connector Source和自定義Source五種。前三種Source是Flink已經封裝好的方法,這些Source只要調用StreamExecutionEnvironment的對應方法就可以創建DataStream了,使用起來比較簡單,我們在學習和測試的時候會經常用到。如果以后生產環境想要從一些分布式、高可用的消息中間件中讀取數據,可以使用第三方Connector Source,比如Apache Kafka Source、AWS Kinesis Source、Google Cloud PubSub Source等(國內公司使用比較多的是Kafka這個消息中間件作為數據源),使用這些第三方的Source,需要額外引入對應消息中間件的依賴jar包。於此同時Flink允許開發者根據自己的需求,自定義各種Source,只要實現SourceFunction這個接口,然后將該實現類的實例作為參數傳入到StreamExecutionEnvironment的addSource方法就可以了,這樣大大的提高了Flink與外部數據源交互的靈活性。
從並行度的角度,Source又可以分為非並行的Source和並行的Source。非並行的Source它的並行度只能為1,即用來讀取外部數據源的Source只有一個實例,在讀取大量數據時效率比較低,通常是用來做一些實驗或測試,例如Flink的Socket網絡端口讀取數據的Source就是一個非並行的Source;並行的Source它的並行度可以是1到多個,即用來讀取外部數據源的Source可以有一個到多個實例(在分布式計算中,並行度是影響吞吐量一個非常重要的因素,在計算資源足夠的前提下,並行度越大,效率越高)。例如Kafka Source就是並行的Source。
多並行Source:
非並行的Source:
單並行的Source直接實現了SourceFunction接口;
多並行的Source,可以繼承RichParallelSourceFunction或實現parallelSourceFunction接口;
1.1.1 基於集合的Source
基於集合的Source是將一個普通的Java集合、迭代器或者可變參數轉換成一個分布式數據集DataStreamSource,它是DataStream的子類,所以也可以使用DataStream類型來引用。得到DataStream后就可以調用Transformation或Sink度數據進行處理了。
1. fromElements
fromElements(T …) 方法是一個非並行的Source,可以將一到多個數據作為可變參數傳入到該方法中,返回DataStreamSource;
並行度為1 ,是一個有限的數據流,程序執行完就退出,通常用來作實驗;
public class FromElementDemo { public static void main(String[] args) throws Exception { //創建流計算執行上下文環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //指定多個相同類型的數據創建DataStream DataStreamSource<String> words = env.fromElements(“flink”, “hadoop”, “flink”); //調用Sink將數據在控制台打印 words.print(); //執行 env.execute(“FromElementDemo”); } }
- fromCollection
fromCollection(Collection) 方法也是一個非並行的Source,可以將一個Collection類型的數據作為參數傳入到該方法中,返回一個DataStreamSource;
並行度為1 ,是一個有限的數據流,程序執行完就退出,通常用來作實驗;
//創建一個List List<String> wordList = Arrays.asList(“flink”, “spark”, “hadoop”, “flink”);
//將List並行化成DataStream DataStreamSource<String> words = env.fromCollection(wordList);
- fromParallelCollection
fromParallelCollection(SplittableIterator, Class) 方法是一個並行的Source(並行度可以使用env的setParallelism來設置),該方法需要傳入兩個參數,第一個是繼承SplittableIterator的實現類的迭代器,第二個是迭代器中數據的類型。
多並行,有限的數據流,程序執行完就退出,通常用來作實驗;
package cn._51doit.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.NumberSequenceIterator;
//多並行的Source
public class FromParCollection {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> nums = env.fromParallelCollection(new NumberSequenceIterator(1L, 20L), Long.class);
int parallelism = nums.getParallelism();
System.out.println("fromParallelCollection創建的DataStream的並行度為:" + parallelism);
nums.print();
env.execute();
}
}
- generateSequence
generateSequence(long from, long to) 方法是一個並行的Source(並行度也可以通過調用該方法后,再調用setParallelism來設置)該方法需要傳入兩個long類型的參數,第一個是起始值,第二個是結束值,返回一個DataStreamSource。
//調用env的generateSequence生成並行的DataSource,輸出的數字是1到100 DataStreamSource<Long> numbers = env.generateSequence(1L, 100L).setParallelism(3);
1.1.2 基於Socket的Source
socketTextStream(String hostname, int port) 方法是一個非並行的Source,該方法需要傳入兩個參數,第一個是指定的IP地址或主機名,第二個是端口號,即從指定的Socket讀取數據創建DataStream。該方法還有多個重載的方法,其中一個是socketTextStream(String hostname, int port, String delimiter, long maxRetry),這個重載的方法可以指定行分隔符和最大重新連接次數。這兩個參數,默認行分隔符是”\n”,最大重新連接次數為0。
//調用env的socketTextStream方法,從指定的Socket地址和端口創建DataStream DataStreamSource<String> lines = env.socketTextStream(“localhost”, 8888);
提示:如果使用socketTextStream讀取數據,在啟動Flink程序之前,必須先啟動一個Socket服務,為了方便,Mac或Linux用戶可以在命令行終端輸入nc -lk 8888啟動一個Socket服務並在命令行中向該Socket服務發送數據。Windows用戶可以在百度中搜索windows安裝netcat命令。
1.1.3 基於文件的Source
基於文件的Source,本質上就是使用指定的FileInputFormat格式讀取數據,可以指定TextInputFormat、CsvInputFormat、BinaryInputFormat等格式,基於文件的Source底層都是ContinuousFileMonitoringFunction,這個類繼承了RichSourceFunction,它們都是非並行的Source。
- readFile
readFile(FileInputFormat inputFormat, String filePath) 方法可以指定讀取文件的FileInputFormat 格式,其中一個重載的方法readFile(FileInputFormat inputFormat, String filePath, FileProcessingMode watchType, long interval) 可以指定FileProcessingMode,它有兩個枚舉類型分別是PROCESS_ONCE和PROCESS_CONTINUOUSLY模式,PROCESS_ONCE模式Source只讀取文件中的數據一次,讀取完成后,程序退出。PROCESS_CONTINUOUSLY模式Source會一直監聽指定的文件,如果使用該模式,需要指定檢測該文件是否發生變化的時間間隔,但是使用這種模式,文件的內容發生變化后,會將這個變化的文件以前的內容和新的內容全部都讀取出來,進而造成數據重復讀取,是一個多並行Source。
String path = “file:///Users/xing/Desktop/a.txt”; //PROCESS_CONTINUOUSLY模式是一直監聽指定的文件或目錄,2秒鍾檢測一次文件是否發生變化 DataStreamSource<String> lines = env.readFile(new TextInputFormat(null), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 2000);
- readTextFile
readTextFile(String filePath) 可以從指定的目錄或文件讀取數據,默認使用的是TextInputFormat格式讀取數據,還有一個重載的方法readTextFile(String filePath, String charsetName)可以傳入讀取文件指定的字符集,默認是UTF-8編碼。該方法是一個有限的數據源,數據讀完后,程序就會退出,不能一直運行。該方法底層調用的是readFile方法,FileProcessingMode為PROCESS_ONCE
DataStreamSource<String> lines = env.readTextFile(path);
1.1.4 第三方Connector Source
在現實生產環境中,為了保證flink可以高效地讀取數據源中的數據,通常是跟一些分布式消息中件結合使用,例如Apache Kafka。Kafka的特點是分布式、多副本、高可用、高吞吐、可以記錄偏移量等。Flink和Kafka整合可以高效的讀取數據,並且可以保證Exactly Once(精確一次性語義)。
- Kafka Consumer
首先在maven項目的pom.xml文件中導入Flink跟Kafka整合的依賴
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
在代碼中,創建一個Properties對象,然后設置Kafka的地址和端口、讀取偏移量的策略、消費者組ID、並且設置Source讀取完Kafka的數據后,定期更新偏移量。然后new FlinkKafkaConsumer,指定三個參數,第一個參數是topic名稱;第二個參數是讀取文件的反序列化Schema,SimpleStringSchema指的是讀取Kafka中的數據反序列化成String格式;第三個參數是傳入事先new 好的Properties實例。然后調用env的addSource方法將FlinkKafkaConsumer的實例傳入,這樣就創建好了一個DataSteamSource。
//設置Kafka相關參數 Properties properties = new Properties();//設置Kafka的地址和端口 properties.setProperty(“bootstrap.servers”, “node-1.:9092,node-2:9092,node-3:9092”); //讀取偏移量策略:如果沒有記錄偏移量,就從頭讀,如果記錄過偏移量,就接着讀 properties.setProperty(“auto.offset.reset”, “earliest”); //設置消費者組ID properties.setProperty(“group.id”, “g1”); //沒有開啟checkpoint,讓flink提交偏移量的消費者定期自動提交偏移量 properties.setProperty(“enable.auto.commit”, “true”); //創建FlinkKafkaConsumer並傳入相關參數 FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>( “test”, //要讀取數據的Topic名稱 new SimpleStringSchema(), //讀取文件的反序列化Schema properties //傳入Kafka的參數 ); //使用addSource添加kafkaConsumer DataStreamSource<String> lines = env.addSource(kafkaConsumer);
注意:目前這種方式無法保證Exactly Once,Flink的Source消費完數據后,將偏移量定期的寫入到Kafka的一個特殊的topic中,這個topic就是__consumer_offset,這種方式雖然可以記錄偏移量,但是無法保證Exactly Once,后面學完了State后,再實現Exactly Once功能。
1.1.5 自定義Source
Flink的DataStream API可以讓開發者根據實際需要,靈活的自定義Source,本質上就是定義一個類,實現SourceFunction這個接口,實現run方法和cancel方法。run方法中實現的就是獲取數據的邏輯,然后調用SourceContext的collect方法,將獲取的數據收集起來,這樣就返回了一個新的DataStreamSource,但是如果只實現這個接口,該Source只能是一個非並行的Source。在生產環境,通常是希望Source可以並行的讀取數據,這樣讀取數據的速度才更快,所以最好的方式是實現ParallelSourceFunction接口或繼承RichParallelSourceFunction這個抽象類,同樣實現實現run方法和cancel方法,這樣該Source就是一個可以並行的Source了。其實所有的Source底層都是調用該的方法。下面是一個簡單的並行的Source,后面學習了State和Checkpoint,再定義一個可以實現Exactly Once的並行的Source。
public class MyParallelSource extends RichParallelSourceFunction<String> { private int i = 1; //定義一個int類型的變量,從1開始 private boolean flag = true; //定義一個flag標標志 //run方法就是用來讀取外部的數據或產生數據的邏輯 @Override public void run(SourceContext<String> ctx) throws Exception { //滿足while循環的條件,就將數據通過SourceContext收集起來 while (i <= 10 && flag) { Thread.sleep(1000); //為避免太快,睡眠1秒 ctx.collect(“data:” + i++); //將數據通過SourceContext收集起來 } } //cancel方法就是讓Source停止 @Override public void cancel() { //將flag設置成false,即停止Source flag = false; } }