本章介紹了Flink DataStream API的基本知識。我們展示了典型的Flink流處理程序的結構和組成部分,還討論了Flink的類型系統以及支持的數據類型,還展示了數據和分區轉換操作。窗口操作符,基於時間語義的轉換操作,有狀態的操作符,以及和外部系統的連接器將在接下來的章節進行介紹。閱讀完這一章后,我們將會知道如何去實現一個具有基本功能的流處理程序。我們的示例程序采用Scala語言,因為Scala語言相對比較簡潔。但Java API也是十分類似的(特殊情況,我們將會指出)。在我們的Github倉庫里,我們所寫的應用程序具有Scala和Java兩種版本。
1 你好,Flink!
讓我們寫一個簡單的例子來獲得使用DataStream API編寫流處理應用程序的粗淺印象。我們將使用這個簡單的示例來展示一個Flink程序的基本結構,以及介紹一些DataStream API的重要特性。我們的示例程序攝取了一條(來自多個傳感器的)溫度測量數據流。
首先讓我們看一下表示傳感器讀數的數據結構:
scala version
case class SensorReading(id: String, timestamp: Long, temperature: Double)
java version
public class SensorReading { public String id; public long timestamp; public double temperature; public SensorReading() { } public SensorReading(String id, long timestamp, double temperature) { this.id = id; this.timestamp = timestamp; this.temperature = temperature; } public String toString() { return "(" + this.id + ", " + this.timestamp + ", " + this.temperature + ")"; } }
示例程序5-1將溫度從華氏溫度讀數轉換成攝氏溫度讀數,然后針對每一個傳感器,每5秒鍾計算一次平均溫度紙。
scala version
object AverageSensorReadings { def main(args: Array[String]) { // 創建運行時環境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 使用事件時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sensorData: DataStream[SensorReading] = env.addSource(new SensorSource) val avgTemp = sensorData .map(r => { val celsius = (r.temperature - 32) * (5.0 / 9.0) SensorReading(r.id, r.timestamp, celsius) }) .keyBy(_.id) .timeWindow(Time.seconds(5)) .apply(new TemperatureAverager) avgTemp.print() env.execute("Compute average sensor temperature") } }
java version
public class AverageSensorReadings { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<SensorReading> sensorData = env.addSource(new SensorSource()); DataStream<T> avgTemp = sensorData .map(r -> { Double celsius = (r.temperature - 32) * (5.0 / 9.0); return SensorReading(r.id, r.timestamp, celsius); }) .keyBy(r -> r.id) .timeWindow(Time.seconds(5)) .apply(new TemperatureAverager()); avgTemp.print(); env.execute("Compute average sensor temperature"); }
你可能已經注意到Flink程序的定義和提交執行使用的就是正常的Scala或者Java的方法。大多數情況下,這些代碼都寫在一個靜態main方法中。在我們的例子中,我們定義了AverageSensorReadings對象,然后將大多數的應用程序邏輯放在了main()中。
Flink流處理程序的結構如下:
- 創建Flink程序執行環境。
- 從數據源讀取一條或者多條流數據
- 使用流轉換算子實現業務邏輯
- 將計算結果輸出到一個或者多個外部設備(可選)
- 執行程序
接下來我們詳細的學習一下這些部分。
2 搭建執行環境
編寫Flink程序的第一件事情就是搭建執行環境。執行環境決定了程序是運行在單機上還是集群上。在DataStream API中,程序的執行環境是由StreamExecutionEnvironment設置的。在我們的例子中,我們通過調用靜態getExecutionEnvironment()方法來獲取執行環境。這個方法根據調用方法的上下文,返回一個本地的或者遠程的環境。如果這個方法是一個客戶端提交到遠程集群的代碼調用的,那么這個方法將會返回一個遠程的執行環境。否則,將返回本地執行環境。
也可以用下面的方法來顯式的創建本地或者遠程執行環境:
scala version
// create a local stream execution environment val localEnv = StreamExecutionEnvironment .createLocalEnvironment() // create a remote stream execution environment val remoteEnv = StreamExecutionEnvironment .createRemoteEnvironment( "host", // hostname of JobManager 1234, // port of JobManager process "path/to/jarFile.jar" ) // JAR file to ship to the JobManager
java version
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment .createLocalEnvironment(); StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment .createRemoteEnvironment( "host", // hostname of JobManager 1234, // port of JobManager process "path/to/jarFile.jar" ); // JAR file to ship to the JobManager
接下來,我們使用env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
來將我們程序的時間語義設置為事件時間。執行環境提供了很多配置選項,例如:設置程序的並行度和程序是否開啟容錯機制。
3 讀取輸入流
一旦執行環境設置好,就該寫業務邏輯了。StreamExecutionEnvironment
提供了創建數據源的方法,這些方法可以從數據流中將數據攝取到程序中。數據流可以來自消息隊列或者文件系統,也可能是實時產生的(例如socket)。
在我們的例子里面,我們這樣寫:
scala version
val sensorData: DataStream[SensorReading] = env .addSource(new SensorSource)
java version
DataStream<SensorReading> sensorData = env .addSource(new SensorSource());
這樣就可以連接到傳感器測量數據的數據源並創建一個類型為SensorReading
的DataStream
了。Flink支持很多數據類型,我們將在接下來的章節里面講解。在我們的例子里面,我們的數據類型是一個定義好的Scala樣例類。SensorReading
樣例類包含了傳感器ID,數據的測量時間戳,以及測量溫度值。assignTimestampsAndWatermarks(new SensorTimeAssigner)
方法指定了如何設置事件時間語義的時間戳和水位線。有關SensorTimeAssigner
我們后面再講。
4 轉換算子的使用
一旦我們有一條DataStream,我們就可以在這條數據流上面使用轉換算子了。轉換算子有很多種。一些轉換算子可以產生一條新的DataStream,當然這個DataStream的類型可能是新類型。還有一些轉換算子不會改變原有DataStream的數據,但會將數據流分區或者分組。業務邏輯就是由轉換算子串起來組合而成的。
在我們的例子中,我們首先使用map()
轉換算子將傳感器的溫度值轉換成了攝氏溫度單位。然后,我們使用keyBy()
轉換算子將傳感器讀數流按照傳感器ID進行分區。接下來,我們定義了一個timeWindow()
轉換算子,這個算子將每個傳感器ID所對應的分區的傳感器讀數分配到了5秒鍾的滾動窗口中。
scala version
val avgTemp = sensorData .map(r => { val celsius = (r.temperature - 32) * (5.0 / 9.0) SensorReading(r.id, r.timestamp, celsius) }) .keyBy(_.id) .timeWindow(Time.seconds(5)) .apply(new TemperatureAverager)
java version
DataStream<T> avgTemp = sensorData .map(r -> { Double celsius = (r.temperature -32) * (5.0 / 9.0); return SensorReading(r.id, r.timestamp, celsius); }) .keyBy(r -> r.id) .timeWindow(Time.seconds(5)) .apply(new TemperatureAverager());
窗口轉換算子將在“窗口操作符”一章中講解。最后,我們使用了一個UDF函數來計算每個窗口的溫度的平均值。我們稍后將會討論UDF函數的實現。
5 輸出結果
流處理程序經常將它們的計算結果發送到一些外部系統中去,例如:Apache Kafka,文件系統,或者數據庫中。Flink提供了一個維護的很好的sink算子的集合,這些sink算子可以用來將數據寫入到不同的系統中去。我們也可以實現自己的sink算子。也有一些Flink程序並不會向第三方外部系統發送數據,而是將數據存儲到Flink系統內部,然后可以使用Flink的可查詢狀態的特性來查詢數據。
在我們的例子中,計算結果是一個DataStream[SensorReading]
數據記錄。每一條數據記錄包含了一個傳感器在5秒鍾的周期里面的平均溫度。計算結果組成的數據流將會調用print()
將計算結果寫到標准輸出。
avgTemp.print()
要注意一點,流的Sink算子的選擇將會影響應用程序端到端(end-to-end
)的一致性,具體就是應用程序的計算提供的到底是at-least-once
還是exactly-once
的一致性語義。應用程序端到端的一致性依賴於所選擇的流的Sink算子和Flink的檢查點算法的集成使用。
6 執行
當應用程序完全寫好時,我們可以調用StreamExecutionEnvironment.execute()
來執行應用程序。在我們的例子中就是我們的最后一行調用:
env.execute("Compute average sensor temperature")
Flink程序是惰性執行的。也就是說創建數據源和轉換算子的API調用並不會立刻觸發任何數據處理邏輯。API調用僅僅是在執行環境中構建了一個執行計划,這個執行計划包含了執行環境創建的數據源和所有的將要用在數據源上的轉換算子。只有當execute()
被調用時,系統才會觸發程序的執行。
構建好的執行計划將被翻譯成一個JobGraph
並提交到JobManager
上面去執行。根據執行環境的種類,一個JobManager
將會運行在一個本地線程中(如果是本地執行環境的化)或者JobGraph
將會被發送到一個遠程的JobManager
上面去。如果JobManager
遠程運行,那么JobGraph
必須和一個包含有所有類和應用程序的依賴的JAR包一起發送到遠程JobManager
。