0 簡介
數據可以存儲在不同的系統中,例如:文件系統,對象存儲系統(OSS),關系型數據庫,Key-Value存儲,搜索引擎索引,日志系統,消息隊列,等等。每一種系統都是給特定的應用場景設計的,在某一個特定的目標上超越了其他系統。今天的數據架構,往往包含着很多不同的存儲系統。在將一個組件加入到我們的系統中時,我們需要問一個問題:“這個組件和架構中的其他組件能多好的一起工作?”
添加一個像Flink這樣的數據處理系統,需要仔細的考慮。因為Flink沒有自己的存儲層,而是讀取數據和持久化數據都需要依賴外部存儲。所以,對於Flink,針對外部系統提供良好的讀取和寫入的連接器就很重要了。盡管如此,僅僅能夠讀寫外部系統對於Flink這樣想要提供任務故障情況下一致性保證的流處理器來講,是不夠的。
在本章中,我們將會討論source和sink的連接器。這些連接器影響了Flink的一致性保證,也提供了對於最流行的一些外部系統的讀寫的連接器。我們還將學習如何實現自定義source和sink連接器,以及如何實現可以向外部系統發送異步讀寫請求的函數。
1 應用的一致性保證
Flink的檢查點和恢復機制定期的會保存應用程序狀態的一致性檢查點。在故障的情況下,應用程序的狀態將會從最近一次完成的檢查點恢復,並繼續處理。盡管如此,可以使用檢查點來重置應用程序的狀態無法完全達到令人滿意的一致性保證。相反,source和sink的連接器需要和Flink的檢查點和恢復機制進行集成才能提供有意義的一致性保證。
為了給應用程序提供恰好處理一次語義的狀態一致性保證,應用程序的source連接器需要能夠將source的讀位置重置到之前保存的檢查點位置。當處理一次檢查點時,source操作符將會把source的讀位置持久化,並在恢復的時候從這些讀位置開始重新讀取。支持讀位置的檢查點的source連接器一般來說是基於文件的存儲系統,如:文件流或者Kafka source(檢查點會持久化某個正在消費的topic的讀偏移量)。如果一個應用程序從一個無法存儲和重置讀位置的source連接器攝入數據,那么當任務出現故障的時候,數據就會丟失。也就是說我們只能提供at-most-once)的一致性保證。
Fink的檢查點和恢復機制和可以重置讀位置的source連接器結合使用,可以保證應用程序不會丟失任何數據。盡管如此,應用程序可能會發出兩次計算結果,因為從上一次檢查點恢復的應用程序所計算的結果將會被重新發送一次(一些結果已經發送出去了,這時任務故障,然后從上一次檢查點恢復,這些結果將被重新計算一次然后發送出去)。所以,可重置讀位置的source和Flink的恢復機制不足以提供端到端的恰好處理一次語義,即使應用程序的狀態是恰好處理一次一致性級別。
一個志在提供端到端恰好處理一次語義一致性的應用程序需要特殊的sink連接器。sink連接器可以在不同的情況下使用兩種技術來達到恰好處理一次一致性語義:冪等性寫入和事務性寫入。
1.1 冪等性寫入
一個冪等操作無論執行多少次都會返回同樣的結果。例如,重復的向hashmap中插入同樣的key-value對就是冪等操作,因為頭一次插入操作之后所有的插入操作都不會改變這個hashmap,因為hashmap已經包含這個key-value對了。另一方面,append操作就不是冪等操作了,因為多次append同一個元素將會導致列表每次都會添加一個元素。在流處理程序中,冪等寫入操作是很有意思的,因為冪等寫入操作可以執行多次但不改變結果。所以它們可以在某種程度上緩和Flink檢查點機制帶來的重播計算結果的效應。
需要注意的是,依賴於冪等性sink來達到exactly-once語義的應用程序,必須保證在從檢查點恢復以后,它將會覆蓋之前已經寫入的結果。例如,一個包含有sink操作的應用在sink到一個key-value存儲時必須保證它能夠確定的計算出將要更新的key值。同時,從Flink程序sink到的key-value存儲中讀取數據的應用,在Flink從檢查點恢復的過程中,可能會看到不想看到的結果。當重播開始時,之前已經發出的計算結果可能會被更早的結果所覆蓋(因為在恢復過程中)。所以,一個消費Flink程序輸出數據的應用,可能會觀察到時間回退,例如讀到了比之前小的計數。也就是說,當流處理程序處於恢復過程中時,流處理程序的結果將處於不穩定的狀態,因為一些結果被覆蓋掉,而另一些結果還沒有被覆蓋。一旦重播完成,也就是說應用程序已經通過了之前出故障的點,結果將會繼續保持一致性。
1.2 事務性寫入
第二種實現端到端的恰好處理一次一致性語義的方法基於事務性寫入。其思想是只將最近一次成功保存的檢查點之前的計算結果寫入到外部系統中去。這樣就保證了在任務故障的情況下,端到端恰好處理一次語義。應用將被重置到最近一次的檢查點,而在這個檢查點之后並沒有向外部系統發出任何計算結果。通過只有當檢查點保存完成以后再寫入數據這種方法,事務性的方法將不會遭受冪等性寫入所遭受的重播不一致的問題。盡管如此,事務性寫入卻帶來了延遲,因為只有在檢查點完成以后,我們才能看到計算結果。
Flink提供了兩種構建模塊來實現事務性sink連接器:write-ahead-log(WAL,預寫式日志)sink和兩階段提交sink。WAL式sink將會把所有計算結果寫入到應用程序的狀態中,等接到檢查點完成的通知,才會將計算結果發送到sink系統。因為sink操作會把數據都緩存在狀態后段,所以WAL可以使用在任何外部sink系統上。盡管如此,WAL還是無法提供刀槍不入的恰好處理一次語義的保證,再加上由於要緩存數據帶來的狀態后段的狀態大小的問題,WAL模型並不十分完美。
與之形成對比的,2PC sink需要sink系統提供事務的支持或者可以模擬出事務特性的模塊。對於每一個檢查點,sink開始一個事務,然后將所有的接收到的數據都添加到事務中,並將這些數據寫入到sink系統,但並沒有提交(commit)它們。當事務接收到檢查點完成的通知時,事務將被commit,數據將被真正的寫入sink系統。這項機制主要依賴於一次sink可以在檢查點完成之前開始事務,並在應用程序從一次故障中恢復以后再commit的能力。
2PC協議依賴於Flink的檢查點機制。檢查點屏障是開始一個新的事務的通知,所有操作符自己的檢查點成功的通知是它們可以commit的投票,而作業管理器通知一個檢查點成功的消息是commit事務的指令。於WAL sink形成對比的是,2PC sinks依賴於sink系統和sink本身的實現可以實現恰好處理一次語義。更多的,2PC sink不斷的將數據寫入到sink系統中,而WAL寫模型就會有之前所述的問題。
不可重置的源 | 可重置的源 | |
---|---|---|
any sink | at-most-once | at-least-once |
冪等性sink | at-most-once | exactly-once(當從任務失敗中恢復時,存在暫時的不一致性) |
預寫式日志sink | at-most-once | at-least-once |
2PC sink | at-most-once | exactly-once |
2 Flink提供的連接器
Flink提供了讀寫很多存儲系統的連接器。消息隊列,日志系統,例如Apache Kafka, Kinesis, RabbitMQ等等這些是常用的數據源。在批處理環境中,數據流很可能是監聽一個文件系統,而當新的數據落盤的時候,讀取這些新數據。
在sink一端,數據流經常寫入到消息隊列中,以供接下來的流處理程序消費。數據流也可能寫入到文件系統中做持久化,或者交給批處理程序來進行分析。數據流還可能被寫入到key-value存儲或者關系型數據庫中,例如Cassandra,ElasticSearch或者MySQL中,這樣數據可供查詢,還可以在儀表盤中顯示出來。
不幸的是,對於大多數存儲系統並沒有標准接口,除了針對DBMS的JDBC。相反,每一個存儲系統都需要有自己的特定的連接器。所以,Flink需要維護針對不同存儲系統(消息隊列,日志系統,文件系統,k-v數據庫,關系型數據庫等等)的連接器實現。
Flink提供了針對Apache Kafka, Kinesis, RabbitMQ, Apache Nifi, 各種文件系統,Cassandra, Elasticsearch, 還有JDBC的連接器。除此之外,Apache Bahir項目還提供了額外的針對例如ActiveMQ, Akka, Flume, Netty, 和Redis等的連接器。
2.1 Apache Kafka Source連接器
Apache Kafka是一個分布式流式平台。它的核心是一個分布式的發布訂閱消息系統。
Kafka將事件流組織為所謂的topics。一個主題就是一個事件日志系統,Kafka可以保證主題中的數據在被讀取時和這些數據在被寫入時相同的順序。為了擴大讀寫的規模,主題可以分裂為多個分區,這些分區分布在一個集群上面。這時,讀寫順序的保證就限制到了分區這個粒度, Kafka並沒有提供從不同分區讀取數據時的順序保證。Kafka分區的讀位置稱為偏移量(offset)。
Kafka的依賴引入如下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.7.1</version> </dependency>
Flink Kafka連接器並行的攝入事件流。每一個並行source任務可以從一個或者多個分區中讀取數據。任務將會跟蹤每一個分區當前的讀偏移量,然后將讀偏移量寫入到檢查點數據中。當從任務故障恢復時,讀偏移量將被恢復,而source任務將從檢查點保存的讀偏移量開始重新讀取數據。Flink Kafka連接器並不依賴Kafka自己的offset-tracking機制(基於消費者組實現)。下圖展示了分區如何分配給source實例。
Kafka source連接器使用如下代碼創建
val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "test") val stream: DataStream[String] = env.addSource( new FlinkKafkaConsumer[String]( "topic", new SimpleStringSchema(), properties))
構造器接受三個參數。第一個參數定義了從哪些topic中讀取數據,可以是一個topic,也可以是topic列表,還可以是匹配所有想要讀取的topic的正則表達式。當從多個topic中讀取數據時,Kafka連接器將會處理所有topic的分區,將這些分區的數據放到一條流中去。
第二個參數是一個DeserializationSchema或者KeyedDeserializationSchema。Kafka消息被存儲為原始的字節數據,所以需要反序列化成Java或者Scala對象。上例中使用的SimpleStringSchema,是一個內置的DeserializationSchema,它僅僅是簡單的將字節數組反序列化成字符串。DeserializationSchema和KeyedDeserializationSchema是公共的接口,所以我們可以自定義反序列化邏輯。
第三個參數是一個Properties對象,設置了用來讀寫的Kafka客戶端的一些屬性。
為了抽取事件時間的時間戳然后產生水印,我們可以通過調用
FlinkKafkaConsumer.assignTimestampsAndWatermark()
方法為Kafka消費者提供AssignerWithPeriodicWatermark或者AssignerWithPucntuatedWatermark。每一個assigner都將被應用到每個分區,來利用每一個分區的順序保證特性。source實例將會根據水印的傳播協議聚合所有分區的水印。
2.2 Apache Kafka Sink連接器
添加依賴:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.7.1</version> </dependency>
下面的例子展示了如何創建一個Kafka sink
val stream: DataStream[String] = ... val myProducer = new FlinkKafkaProducer[String]( "localhost:9092", // broker list "topic", // target topic new SimpleStringSchema) // serialization schema stream.addSink(myProducer)
2.3 Kakfa Sink的at-least-once保證
Flink的Kafka sink提供了基於配置的一致性保證。Kafka sink使用下面的條件提供了至少處理一次保證:
- Flink檢查點機制開啟,所有的數據源都是可重置的。
- 當寫入失敗時,sink連接器將會拋出異常,使得應用程序掛掉然后重啟。這是默認行為。應用程序內部的Kafka客戶端還可以配置為重試寫入,只要提前聲明當寫入失敗時,重試幾次這樣的屬性(retries property)。
- sink連接器在完成它的檢查點之前會等待Kafka發送已經將數據寫入的通知。
2.4 Kafka Sink的恰好處理一次語義保證
Kafka 0.11版本引入了事務寫特性。由於這個新特性,Flink Kafka sink可以為輸出結果提供恰好處理一次語義的一致性保證,只要經過合適的配置就行。Flink程序必須開啟檢查點機制,並從可重置的數據源進行消費。FlinkKafkaProducer還提供了包含Semantic參數的構造器來控制sink提供的一致性保證。可能的取值如下:
- Semantic.NONE,不提供任何一致性保證。數據可能丟失或者被重寫多次。
- Semantic.AT_LEAST_ONCE,保證無數據丟失,但可能被處理多次。這個是默認設置。
- Semantic.EXACTLY_ONCE,基於Kafka的事務性寫入特性實現,保證每條數據恰好處理一次。
2.5 文件系統source連接器
Apache Flink針對文件系統實現了一個可重置的source連接器,將文件看作流來讀取數據。如下面的例子所示:
val lineReader = new TextInputFormat(null) val lineStream: DataStream[String] = env.readFile[String]( lineReader, // The FileInputFormat "hdfs:///path/to/my/data", // The path to read FileProcessingMode .PROCESS_CONTINUOUSLY, // The processing mode 30000L) // The monitoring interval in ms
StreamExecutionEnvironment.readFile()接收如下參數:
- FileInputFormat參數,負責讀取文件中的內容。
- 文件路徑。如果文件路徑指向單個文件,那么將會讀取這個文件。如果路徑指向一個文件夾,FileInputFormat將會掃描文件夾中所有的文件。
- PROCESS_CONTINUOUSLY將會周期性的掃描文件,以便掃描到文件新的改變。
- 30000L表示多久掃描一次監聽的文件。
FileInputFormat是一個特定的InputFormat,用來從文件系統中讀取文件。FileInputFormat分兩步讀取文件。首先掃描文件系統的路徑,然后為所有匹配到的文件創建所謂的input splits。一個input split將會定義文件上的一個范圍,一般通過讀取的開始偏移量和讀取長度來定義。在將一個大的文件分割成一堆小的splits以后,這些splits可以分發到不同的讀任務,這樣就可以並行的讀取文件了。FileInputFormat的第二步會接收一個input split,讀取被split定義的文件范圍,然后返回對應的數據。
DataStream應用中使用的FileInputFormat需要實現CheckpointableInputFormat接口。這個接口定義了方法來做檢查點和重置文件片段的當前的讀取位置。
在Flink 1.7中,Flink提供了一些類,這些類繼承了FileInputFormat,並實現了CheckpointableInputFormat接口。TextInputFormat一行一行的讀取文件,而CsvInputFormat使用逗號分隔符來讀取文件。
2.6 文件系統sink連接器
在將流處理應用配置成exactly-once檢查點機制,以及配置成所有源數據都能在故障的情況下可以重置,Flink的StreamingFileSink提供了端到端的恰好處理一次語義保證。下面的例子展示了StreamingFileSink的使用方式。
val input: DataStream[String] = … val sink: StreamingFileSink[String] = StreamingFileSink .forRowFormat( new Path("/base/path"), new SimpleStringEncoder[String]("UTF-8")) .build() input.addSink(sink)
當StreamingFileSink接到一條數據,這條數據將被分配到一個桶(bucket)中。一個桶是我們配置的“/base/path”的子目錄。
Flink使用BucketAssigner來分配桶。BucketAssigner是一個公共的接口,為每一條數據返回一個BucketId,BucketId決定了數據被分配到哪個子目錄。如果沒有指定BucketAssigner,Flink將使用DateTimeBucketAssigner來將每條數據分配到每個一個小時所產生的桶中去,基於數據寫入的處理時間(機器時間,牆上時鍾)。
StreamingFileSink提供了exactly-once輸出的保證。sink通過一個commit協議來達到恰好處理一次語義的保證。這個commit協議會將文件移動到不同的階段,有以下狀態:in progress,pending,finished。這個協議基於Flink的檢查點機制。當Flink決定roll a file時,這個文件將被關閉並移動到pending狀態,通過重命名文件來實現。當下一個檢查點完成時,pending文件將被移動到finished狀態,同樣是通過重命名來實現。
一旦任務故障,sink任務需要將處於in progress狀態的文件重置到上一次檢查點的寫偏移量。這個可以通過關閉當前in progress的文件,並將文件結尾無效的部分丟棄掉來實現。
3 實現自定義源函數
DataStream API提供了兩個接口來實現source連接器:
- SourceFunction和RichSourceFunction可以用來定義非並行的source連接器,source跑在單任務上。
- ParallelSourceFunction和RichParallelSourceFunction可以用來定義跑在並行實例上的source連接器。
除了並行於非並行的區別,這兩種接口完全一樣。就像process function的rich版本一樣,RichSourceFunction和RichParallelSourceFunction的子類可以override open()和close()方法,也可以訪問RuntimeContext,RuntimeContext提供了並行任務實例的數量,當前任務實例的索引,以及一些其他信息。
SourceFunction和ParallelSourceFunction定義了兩種方法:
- void run(SourceContext ctx)
- cancel()
run()方法用來讀取或者接收數據然后將數據攝入到Flink應用中。根據接收數據的系統,數據可能是推送的也可能是拉取的。Flink僅僅在特定的線程調用run()方法一次,通常情況下會是一個無限循環來讀取或者接收數據並發送數據。任務可以在某個時間點被顯式的取消,或者由於流是有限流,當數據被消費完畢時,任務也會停止。
當應用被取消或者關閉時,cancel()方法會被Flink調用。為了優雅的關閉Flink應用,run()方法需要在cancel()被調用以后,立即終止執行。下面的例子顯示了一個簡單的源函數的例子:從0數到Long.MaxValue。
class CountSource extends SourceFunction[Long] { var isRunning: Boolean = true override def run(ctx: SourceFunction.SourceContext[Long]) = { var cnt: Long = -1 while (isRunning && cnt < Long.MaxValue) { cnt += 1 ctx.collect(cnt) } } override def cancel() = isRunning = false }
3.1 可重置的源函數
之前我們講過,應用程序只有使用可以重播輸出數據的數據源時,才能提供令人滿意的一致性保證。如果外部系統暴露了獲取和重置讀偏移量的API,那么source函數就可以重播源數據。這樣的例子包括一些能夠提供文件流的偏移量的文件系統,或者提供seek方法用來移動到文件的特定位置的文件系統。或者Apache Kafka這種可以為每一個主題的分區提供偏移量並且可以設置分區的讀位置的系統。一個反例就是source連接器連接的是socket,socket將會立即丟棄已經發送過的數據。
支持重播輸出的源函數需要和Flink的檢查點機制集成起來,還需要在檢查點被處理時,持久化當前所有的讀取位置。當應用從一個保存點(savepoint)恢復或者從故障恢復時,Flink會從最近一次的檢查點或者保存點中獲取讀偏移量。如果程序開始時並不存在狀態,那么讀偏移量將會被設置到一個默認值。一個可重置的源函數需要實現CheckpointedFunction接口,還需要能夠存儲讀偏移量和相關的元數據,例如文件的路徑,分區的ID。這些數據將被保存在list state或者union list state中。
下面的例子將CountSource重寫為可重置的數據源。
scala version
class ResettableCountSource extends SourceFunction[Long] with CheckpointedFunction { var isRunning: Boolean = true var cnt: Long = _ var offsetState: ListState[Long] = _ override def run(ctx: SourceFunction.SourceContext[Long]) = { while (isRunning && cnt < Long.MaxValue) { // synchronize data emission and checkpoints ctx.getCheckpointLock.synchronized { cnt += 1 ctx.collect(cnt) } } } override def cancel() = isRunning = false override def snapshotState( snapshotCtx: FunctionSnapshotContext ): Unit = { // remove previous cnt offsetState.clear() // add current cnt offsetState.add(cnt) } override def initializeState( initCtx: FunctionInitializationContext): Unit = { val desc = new ListStateDescriptor[Long]( "offset", classOf[Long]) offsetState = initCtx .getOperatorStateStore .getListState(desc) // initialize cnt variable val it = offsetState.get() cnt = if (null == it || !it.iterator().hasNext) { -1L } else { it.iterator().next() } } }
4 實現自定義sink函數
DataStream API中,任何運算符或者函數都可以向外部系統發送數據。DataStream不需要最終流向sink運算符。例如,我們可能實現了一個FlatMapFunction,這個函數將每一個接收到的數據通過HTTP POST請求發送出去,而不使用Collector發送到下一個運算符。DataStream API也提供了SinkFunction接口以及對應的rich版本RichSinkFunction抽象類。SinkFunction接口提供了一個方法:
void invode(IN value, Context ctx)
SinkFunction的Context可以訪問當前處理時間,當前水位線,以及數據的時間戳。
下面的例子展示了一個簡單的SinkFunction,可以將傳感器讀數寫入到socket中去。需要注意的是,我們需要在啟動Flink程序前啟動一個監聽相關端口的進程。否則將會拋出ConnectException異常。可以運行“nc -l localhost 9191”命令。
val readings: DataStream[SensorReading] = ... // write the sensor readings to a socket readings.addSink(new SimpleSocketSink("localhost", 9191)) // set parallelism to 1 because only one thread can write to a socket .setParallelism(1) // ----- class SimpleSocketSink(val host: String, val port: Int) extends RichSinkFunction[SensorReading] { var socket: Socket = _ var writer: PrintStream = _ override def open(config: Configuration): Unit = { // open socket and writer socket = new Socket(InetAddress.getByName(host), port) writer = new PrintStream(socket.getOutputStream) } override def invoke( value: SensorReading, ctx: SinkFunction.Context[_]): Unit = { // write sensor reading to socket writer.println(value.toString) writer.flush() } override def close(): Unit = { // close writer and socket writer.close() socket.close() } }
之前我們討論過,端到端的一致性保證建立在sink連接器的屬性上面。為了達到端到端的恰好處理一次語義的目的,應用程序需要冪等性的sink連接器或者事務性的sink連接器。上面例子中的SinkFunction既不是冪等寫入也不是事務性的寫入。由於socket具有只能添加(append-only)這樣的屬性,所以不可能實現冪等性的寫入。又因為socket不具備內置的事務支持,所以事務性寫入就只能使用Flink的WAL sink特性來實現了。接下來我們將學習如何實現冪等sink連接器和事務sink連接器。
4.1 冪等sink連接器
對於大多數應用,SinkFunction接口足以實現一個冪等性寫入的sink連接器了。需要以下兩個條件:
- 結果數據必須具有確定性的key,在這個key上面冪等性更新才能實現。例如一個計算每分鍾每個傳感器的平均溫度值的程序,確定性的key值可以是傳感器的ID和每分鍾的時間戳。確定性的key值,對於在故障恢復的場景下,能夠正確的覆蓋結果非常的重要。
- 外部系統支持針對每個key的更新,例如關系型數據庫或者key-value存儲。
下面的例子展示了如何實現一個針對JDBC數據庫的冪等寫入sink連接器,這里使用的是Apache Derby數據庫。
val readings: DataStream[SensorReading] = ... // write the sensor readings to a Derby table readings.addSink(new DerbyUpsertSink) // ----- class DerbyUpsertSink extends RichSinkFunction[SensorReading] { var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ override def open(parameters: Configuration): Unit = { // connect to embedded in-memory Derby conn = DriverManager.getConnection( "jdbc:derby:memory:flinkExample", new Properties()) // prepare insert and update statements insertStmt = conn.prepareStatement( "INSERT INTO Temperatures (sensor, temp) VALUES (?, ?)") updateStmt = conn.prepareStatement( "UPDATE Temperatures SET temp = ? WHERE sensor = ?") } override def invoke(SensorReading r, context: Context[_]): Unit = { // set parameters for update statement and execute it updateStmt.setDouble(1, r.temperature) updateStmt.setString(2, r.id) updateStmt.execute() // execute insert statement // if update statement did not update any row if (updateStmt.getUpdateCount == 0) { // set parameters for insert statement insertStmt.setString(1, r.id) insertStmt.setDouble(2, r.temperature) // execute insert statement insertStmt.execute() } } override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } }
由於Apache Derby並沒有提供內置的UPSERT方法,所以這個sink連接器實現了UPSERT寫。具體實現方法是首先去嘗試更新一行數據,如果這行數據不存在,則插入新的一行數據。
4.2 事務性sink連接器
事務寫入sink連接器需要和Flink的檢查點機制集成,因為只有在檢查點成功完成以后,事務寫入sink連接器才會向外部系統commit數據。
為了簡化事務性sink的實現,Flink提供了兩個模版用來實現自定義sink運算符。這兩個模版都實現了CheckpointListener接口。CheckpointListener接口將會從作業管理器接收到檢查點完成的通知。
- GenericWriteAheadSink模版會收集檢查點之前的所有的數據,並將數據存儲到sink任務的運算符狀態中。狀態保存到了檢查點中,並在任務故障的情況下恢復。當任務接收到檢查點完成的通知時,任務會將所有的數據寫入到外部系統中。
- TwoPhaseCommitSinkFunction模版利用了外部系統的事務特性。對於每一個檢查點,任務首先開始一個新的事務,並將接下來所有的數據都寫到外部系統的當前事務上下文中去。當任務接收到檢查點完成的通知時,sink連接器將會commit這個事務。
GENERICWRITEAHEADSINK
GenericWriteAheadSink使得sink運算符可以很方便的實現。這個運算符和Flink的檢查點機制集成使用,目標是將每一條數據恰好一次寫入到外部系統中去。需要注意的是,在發生故障的情況下,write-ahead log sink可能會不止一次的發送相同的數據。所以GenericWriteAheadSink無法提供完美無缺的恰好處理一次語義的一致性保證,而是僅能提供at-least-once這樣的保證。我們接下來詳細的討論這些場景。
GenericWriteAheadSink的原理是將接收到的所有數據都追加到有檢查點分割好的預寫式日志中去。每當sink運算符碰到檢查點屏障,運算符將會開辟一個新的section,並將接下來的所有數據都追加到新的section中去。WAL(預寫式日志)將會保存到運算符狀態中。由於log能被恢復,所有不會有數據丟失。
當GenericWriteAheadSink接收到檢查點完成的通知時,將會發送對應檢查點的WAL中存儲的所有數據。當所有數據發送成功,對應的檢查點必須在內部提交。
檢查點的提交分兩步。第一步,sink持久化檢查點被提交的信息。第二步,刪除WAL中所有的數據。我們不能將commit信息保存在Flink應用程序狀態中,因為狀態不是持久化的,會在故障恢復時重置狀態。相反,GenericWriteAheadSink依賴於可插拔的組件在一個外部持久化存儲中存儲和查找提交信息。這個組件就是CheckpointCommitter。
繼承GenericWriteAheadSink的運算符需要提供三個構造器函數。
- CheckpointCommitter
- TypeSerializer,用來序列化輸入數據。
- 一個job ID,傳給CheckpointCommitter,當應用重啟時可以識別commit信息。
還有,write-ahead運算符需要實現一個單獨的方法:
boolean sendValues(Iterable<IN> values, long chkpntId, long timestamp)
當檢查點完成時,GenericWriteAheadSink調用sendValues()方法來將數據寫入到外部存儲系統中。這個方法接收一個檢查點對應的所有數據的迭代器,檢查點的ID,檢查點被處理時的時間戳。當數據寫入成功時,方法必須返回true,寫入失敗返回false。
下面的例子展示了如何實現一個寫入到標准輸出的write-ahead sink。它使用了FileCheckpointCommitter。
val readings: DataStream[SensorReading] = ... // write the sensor readings to the standard out via a write-ahead log readings.transform( "WriteAheadSink", new SocketWriteAheadSink) class StdOutWriteAheadSink extends GenericWriteAheadSink[SensorReading]( // CheckpointCommitter that commits // checkpoints to the local filesystem new FileCheckpointCommitter(System.getProperty("java.io.tmpdir")), // Serializer for records createTypeInformation[SensorReading] .createSerializer(new ExecutionConfig), // Random JobID used by the CheckpointCommitter UUID.randomUUID.toString) { override def sendValues( readings: Iterable[SensorReading], checkpointId: Long, timestamp: Long): Boolean = { for (r <- readings.asScala) { // write record to standard out println(r) } true } }
之前我們講過,GenericWriteAheadSink無法提供完美的exactly-once保證。有兩個故障狀況會導致數據可能被發送不止一次。
- 當任務執行sendValues()方法時,程序掛掉了。如果外部系統無法原子性的寫入所有數據(要么都寫入要么都不寫),一些數據可能會寫入,而另一些數據並沒有被寫入。由於checkpoint還沒有commit,所以在任務恢復的過程中一些數據可能會被再次寫入。
- 所有數據都寫入成功了,sendValues()方法也返回true了;但在CheckpointCommitter方法被調用之前程序掛了,或者CheckpointCommitter在commit檢查點時失敗了。那么在恢復的過程中,所有未被提交的檢查點將會被重新寫入。
TWOPHASECOMMITSINKFUNCTION
Flink提供了TwoPhaseCommitSinkFunction接口來簡化sink函數的實現。這個接口保證了端到端的exactly-once語義。2PC sink函數是否提供這樣的一致性保證取決於我們的實現細節。我們需要討論一個問題:“2PC協議是否開銷太大?”
通常來講,為了保證分布式系統的一致性,2PC是一個非常昂貴的方法。盡管如此,在Flink的語境下,2PC協議針對每一個檢查點只運行一次。TwoPhaseCommitSinkFunction和WAL sink很相似,不同點在於前者不會將數據收集到state中,而是會寫入到外部系統事務的上下文中。
TwoPhaseCommitSinkFunction實現了以下協議。在sink任務發送出第一條數據之前,任務將在外部系統中開始一個事務,所有接下來的數據將被寫入這個事務的上下文中。當作業管理器初始化檢查點並將檢查點屏障插入到流中的時候,2PC協議的投票階段開始。當運算符接收到檢查點屏障,運算符將保存它的狀態,當保存完成時,運算符將發送一個acknowledgement信息給作業管理器。當sink任務接收到檢查點屏障時,運算符將會持久化它的狀態,並准備提交當前的事務,以及acknowledge JobManager中的檢查點。發送給作業管理器的acknowledgement信息類似於2PC協議中的commit投票。sink任務還不能提交事務,因為它還沒有保證所有的任務都已經完成了它們的檢查點操作。sink任務也會為下一個檢查點屏障之前的所有數據開始一個新的事務。
當作業管理器成功接收到所有任務實例發出的檢查點操作成功的通知時,作業管理器將會把檢查點完成的通知發送給所有感興趣的任務。這里的通知對應於2PC協議的提交命令。當sink任務接收到通知時,它將commit所有處於開啟狀態的事務。一旦sink任務acknowledge了檢查點操作,它必須能夠commit對應的事務,即使任務發生故障。如果commit失敗,數據將會丟失。
讓我們總結一下外部系統需要滿足什么樣的要求:
- 外部系統必須提供事務支持,或者sink的實現能在外部系統上模擬事務功能。
- 在檢查點操作期間,事務必須處於open狀態,並接收這段時間數據的持續寫入。
- 事務必須等到檢查點操作完成的通知到來才可以提交。在恢復周期中,可能需要一段時間等待。如果sink系統關閉了事務(例如超時了),那么未被commit的數據將會丟失。
- sink必須在進程掛掉后能夠恢復事務。一些sink系統會提供事務ID,用來commit或者abort一個開始的事務。
- commit一個事務必須是一個冪等性操作。sink系統或者外部系統能夠觀察到事務已經被提交,或者重復提交並沒有副作用。
下面的例子可能會讓上面的一些概念好理解一些。
class TransactionalFileSink(val targetPath: String, val tempPath: String) extends TwoPhaseCommitSinkFunction[(String, Double), String, Void]( createTypeInformation[String].createSerializer(new ExecutionConfig), createTypeInformation[Void].createSerializer(new ExecutionConfig)) { var transactionWriter: BufferedWriter = _ // Creates a temporary file for a transaction into // which the records are written. override def beginTransaction(): String = { // path of transaction file // is built from current time and task index val timeNow = LocalDateTime.now(ZoneId.of("UTC")) .format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) val taskIdx = this.getRuntimeContext.getIndexOfThisSubtask val transactionFile = s"$timeNow-$taskIdx" // create transaction file and writer val tFilePath = Paths.get(s"$tempPath/$transactionFile") Files.createFile(tFilePath) this.transactionWriter = Files.newBufferedWriter(tFilePath) println(s"Creating Transaction File: $tFilePath") // name of transaction file is returned to // later identify the transaction transactionFile } /** Write record into the current transaction file. */ override def invoke( transaction: String, value: (String, Double), context: Context[_]): Unit = { transactionWriter.write(value.toString) transactionWriter.write('\n') } /** Flush and close the current transaction file. */ override def preCommit(transaction: String): Unit = { transactionWriter.flush() transactionWriter.close() } // Commit a transaction by moving // the precommitted transaction file // to the target directory. override def commit(transaction: String): Unit = { val tFilePath = Paths.get(s"$tempPath/$transaction") // check if the file exists to ensure // that the commit is idempotent if (Files.exists(tFilePath)) { val cFilePath = Paths.get(s"$targetPath/$transaction") Files.move(tFilePath, cFilePath) } } // Aborts a transaction by deleting the transaction file. override def abort(transaction: String): Unit = { val tFilePath = Paths.get(s"$tempPath/$transaction") if (Files.exists(tFilePath)) { Files.delete(tFilePath) } } }
TwoPhaseCommitSinkFunction[IN, TXN, CONTEXT]包含如下三個范型參數:
- IN表示輸入數據的類型。
- TXN定義了一個事務的標識符,可以用來識別和恢復事務。
- CONTEXT定義了自定義的上下文。
TwoPhaseCommitSinkFunction的構造器需要兩個TypeSerializer。一個是TXN的類型,另一個是CONTEXT的類型。
最后,TwoPhaseCommitSinkFunction定義了五個需要實現的方法:
- beginTransaction(): TXN開始一個事務,並返回事務的標識符。
- invoke(txn: TXN, value: IN, context: Context[_]): Unit將值寫入到當前事務中。
- preCommit(txn: TXN): Unit預提交一個事務。一個預提交的事務不會接收新的寫入。
- commit(txn: TXN): Unit提交一個事務。這個操作必須是冪等的。
- abort(txn: TXN): Unit終止一個事務。