前言、flink介紹:
Apache Flink 是一個分布式處理引擎,用於在無界和有界數據流上進行有狀態的計算。通過對時間精確控制以及狀態化控制,Flink能夠運行在任何處理無界流的應用中,同時對有界流,則由一些專為固定數據集設計的算法和數據結構進行了內部處理,從而提升了性能。
1、flink特性
(1)Flink是一個開源的流處理框架,它具有以下特點:
- 分布式:Flink程序可以運行在多台機器上。
- 高性能:處理性能比較高。
- 高可用:由於Flink程序本身是穩定的,因此它支持高可用性。
- 准確:Flink可以保證數據處理的准確性。
Flink主要由Java代碼實現,它同時支持實時流處理和批處理。對於Flink而言,作為一個流處理框架,批數據只是流數據的一個極限特例而已。此外,Flink還支持迭代計算、內存管理和程序優化,這是它的原生特性。
(2)優勢:
- 流式優先:Flink可以連續處理流式數據。
- 容錯:Flink提供有狀態的計算,可以記錄數據的處理狀態,當數據處理失敗的時候,能夠無縫地從失敗中恢復,並保持Exactly-once。
- 可伸縮:Flink中的一個集群支持上千個節點。
- 性能:Flink支持高吞吐(單位時間內可大量完成處理的數據操作)、低延遲(可快速支持海量數據)。
2、flink架構
Flink架構可以分為4層,包括Deploy層、Core層、API層和Library層
-
Deploy層:該層主要涉及Flink的部署模式,Flink支持多種部署模式——本地、集群(Standalone/YARN)和雲服務器(GCE/EC2)。
-
Core層:該層提供了支持Flink計算的全部核心實現,為API層提供基礎服務。
-
API層:該層主要實現了面向無界Stream的流處理和面向Batch的批處理API,其中流處理對應DataStream API,批處理對應DataSet API。
-
Library層:該層也被稱為Flink應用框架層,根據API層的划分,在API層之上構建的滿足特定應用的實現計算框架,也分別對應於面向流處理和面向批處理兩類。面向流處理支持CEP(復雜事件處理)、基於SQL-like的操作(基於Table的關系操作);面向批處理支持FlinkML(機器學習庫)、Gelly(圖處理)、Table 操作。
一、相關概念:
1、watermark
watermark是一種衡量Event Time進展的機制,它是數據本身的一個隱藏屬性。通常基於Event Time的數據,自身都包含一個timestamp;
1)作用:
watermark是用於處理亂序事件的,通常用watermark機制結合window來實現。流處理從事件產生、到流經source、再到operator,中間是有一個過程和時間。大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由於網絡等原因,導致亂序的產生(out-of-order或late element)。對於late element,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了。這個機制就是watermark。
2、 CheckPoint
(2.1)概述
為了保證State的容錯性,Flink需要對State進行CheckPoint。CheckPoint是Flink實現容錯機制的核心功能,它能夠根據配置周期性地基於Stream中各個Operator/Task的狀態來生成快照,從而將這些狀態數據定期持久化存儲下來。Flink程序一旦意外崩潰,重新運行程序時可以有選擇地從這些快照進行恢復,從而修正因為故障帶來的程序數據異常。
(2.2)使用說明
1)Checkpoint 在默認的情況下僅用於恢復失敗的作業,並不保留,當程序取消時 checkpoint 就會被刪除。
2) 默認情況下,CheckPoint功能是Disabled(禁用)的,使用時需要先開啟它。
env.enableCheckpointing(1000)
(2.3)目錄結構
checkpoint 由元數據文件、數據文件(與 state backend 相關)組成。可通過配置文件中 “state.checkpoints.dir” 配置項來指定元數據文件和數據文件的存儲路徑,另外也可以在代碼中針對單個作業特別指定該配置項。
當前的 checkpoint 目錄結構如下所示:
/user-defined-checkpoint-dir /{job-id} | + --shared/ + --taskowned/ + --chk-1/ + --chk-2/ + --chk-3/ ...
在hdfs中的存儲結構
其中 SHARED 目錄保存了可能被多個 checkpoint 引用的文件,TASKOWNED 保存了不會被 JobManager 刪除的文件,EXCLUSIVE 則保存那些僅被單個 checkpoint 引用的文件。
3、Flink基本組件
Flink中提供了3個組件,包括DataSource、Transformation和DataSink。
-
DataSource:表示數據源組件,主要用來接收數據,目前官網提供了readTextFile、socketTextStream、fromCollection以及一些第三方的Source。
-
Transformation:表示算子,主要用來對數據進行處理,比如Map、FlatMap、Filter、Reduce、Aggregation等。
-
DataSink:表示輸出組件,主要用來把計算的結果輸出到其他存儲介質中,比如writeAsText以及Kafka、Redis、Elasticsearch等第三方Sink組件。因此,想要組裝一個Flink Job,至少需要這3個組件。
即Flink Job=DataSource+Transformation+DataSink
二、Flink DataStreams API
1、DataStreams操作
獲得一個StreamExecutionEnvironment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///path/to/file");
使用轉換函數在DataStream上調用方法轉換(通過將原始集合中的每個String轉換為Integer,將創建一個新的DataStream)
DataStream<String> input = ...; DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } });
writeAsText(String path)
print()
1)同步執行:一旦觸發調用execute()
上StreamExecutionEnvironment,
根據ExecutionEnvironment
執行類型的不同,在本地計算機上觸發或將job提交到群集上執行。該execute()
方法將等待作業完成,然后返回JobExecutionResult
,其中包含執行時間和累加器結果。
2)異步執行:調用觸發異步作業執行executeAysnc()
的StreamExecutionEnvironment
。它將返回一個JobClient
與提交的作業進行通信的。
final JobClient jobClient = env.executeAsync(); final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
eg:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(value -> value.f0) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum(1); dataStream.print(); env.execute("Window WordCount"); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
2、dataSource
基於文件:
readTextFile(path)
-TextInputFormat
逐行讀取文本文件,即符合規范的文件,並將其作為字符串返回。readFile(fileInputFormat, path)
-根據指定的文件輸入格式讀取(一次)文件。readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
-這是前兩個內部調用的方法。它path
根據給定的讀取文件fileInputFormat
。根據提供的內容watchType
,此源可以定期(每interval
ms)監視路徑中的新數據(FileProcessingMode.PROCESS_CONTINUOUSLY
),或者對路徑中當前的數據進行一次處理並退出(FileProcessingMode.PROCESS_ONCE
)。使用pathFilter
,用戶可以進一步從文件中排除文件。
基於套接字:
socketTextStream
-從套接字讀取。元素可以由定界符分隔。
基於集合:
fromCollection(Collection)
-從Java Java.util.Collection創建數據流。集合中的所有元素必須具有相同的類型。fromCollection(Iterator, Class)
-從迭代器創建數據流。該類指定迭代器返回的元素的數據類型。fromElements(T ...)
-從給定的對象序列創建數據流。所有對象必須具有相同的類型。fromParallelCollection(SplittableIterator, Class)
-從迭代器並行創建數據流。該類指定迭代器返回的元素的數據類型。generateSequence(from, to)
-在給定間隔內並行生成數字序列。
自定義:
addSource
-附加新的源功能。例如,Apache Kafka,可以使用 addSource(new FlinkKafkaConsumer<>(...))
。
eg:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile"); // read text file from an HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile"); // read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .types(Integer.class, String.class, Double.class); // read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .includeFields("10010") // take the first and the fourth field
.types(String.class, Double.class); // read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .pojoType(Person.class, "name", "age", "zipcode"); // read a file from the specified path of type SequenceFileInputFormat
DataSet<Tuple2<IntWritable, Text>> tuples = env.createInput(HadoopInputs.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file")); // creates a set from some given elements
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar"); // generate a number sequence
DataSet<Long> numbers = env.generateSequence(1, 10000000); // Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData = env.createInput( JdbcInputFormat.buildJdbcInputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:persons") .setQuery("select name, age from persons") .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) .finish() ); // Note: Flink's program compiler needs to infer the data types of the data items which are returned // by an InputFormat. If this information cannot be automatically inferred, it is necessary to // manually provide the type information as shown in the examples above.
對於基於文件的輸入,當輸入路徑為目錄時,默認情況下不枚舉嵌套文件。而是只讀取基本目錄中的文件,而忽略嵌套文件。可以通過recursive.file.enumeration
配置參數啟用嵌套文件的遞歸枚舉,如以下示例所示:
// enable recursive enumeration of nested input files
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // create a configuration object
Configuration parameters = new Configuration(); // set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true); // pass the configuration to the data source
DataSet<String> logs = env.readTextFile("file:///path/with.nested/files") .withParameters(parameters);
3、DataStream轉換
(1)數據接收器
writeAsText()
/TextOutputFormat
-將元素按行寫為字符串。通過調用每個元素的toString()方法獲得字符串。writeAsCsv(...)
/CsvOutputFormat
-將元組寫為逗號分隔的值文件。行和字段定界符是可配置的。每個字段的值來自對象的toString()方法。print()
/printToErr()
-在標准輸出/標准錯誤流上打印每個元素的toString()值。可選地,可以提供前綴(msg),該前綴在輸出之前。這可以幫助區分打印的不同調用。如果並行度大於1,則輸出之前還將帶有產生輸出的任務的標識符。writeUsingOutputFormat()
/FileOutputFormat
-的方法和自定義文件輸出基類。支持自定義對象到字節的轉換。writeToSocket
-根據SerializationSchema
addSink
-調用自定義接收器功能。Flink與其他系統(例如Apache Kafka)的連接器捆綁在一起,這些系統已實現為接收器功能。
請注意:
1)write*()
方法DataStream
主要用於調試目的。它們不參與Flink的檢查點(這些功能通常具有至少一次語義)。
2)刷新到目標系統的數據取決於OutputFormat的實現,即並非所有發送到OutputFormat的元素都立即顯示在目標系統中。同樣,在失敗的情況下,這些記錄可能會丟失。
3)為了將流可靠、准確地一次傳輸到文件系統中,請使用StreamingFileSink
。同樣,通過該.addSink(...)
方法的自定義實現可以參與Flink一次精確語義的檢查點。
eg:
// text data
DataSet<String> textData = // [...] // write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS"); // write DataSet to a file on an HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS"); // write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE); // tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|"); // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file"); // this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file", new TextFormatter<Tuple2<Integer, Integer>>() { public String format (Tuple2<Integer, Integer> value) { return value.f1 + " - " + value.f0; } });
自定義輸出格式
DataSet<Tuple3<String, Integer, Double>> myResult = [...] // write Tuple DataSet to a relational database
myResult.output( // build and configure OutputFormat
JdbcOutputFormat.buildJdbcOutputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:persons") .setQuery("insert into persons (name, age, height) values (?,?,?)") .finish() );
本地排序輸出
可以使用元組字段位置或字段表達式按指定順序在指定字段上對數據接收器的輸出進行本地排序。這適用於每種輸出格式。
eg:
DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...] // sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print(); // sort output on Double field in descending and Integer field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print(); // sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...); // sort output on the full tuple in ascending order
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...); // sort atomic type (String) output in descending order
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);
注:目前尚不支持全局排序的輸出。
(2)控制延遲(設置使用流處理、批處理)
默認情況下,元素不會在網絡上一對一傳輸(產生不必要的網絡通信開銷),通常會進行緩沖。緩沖區的大小可以在Flink配置文件中設置。控制吞吐量和延遲,可以在執行環境(或各個運算符)上使用來設置緩沖區填充的最大等待時間env.setBufferTimeout(timeoutMillis)
。超過設置時間,即使緩沖區未滿,也會自動發送緩沖區。
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setBufferTimeout(timeoutMillis);//默認是100ms env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
為了最大程度地提高吞吐量,設置setBufferTimeout(-1)表示永不超時
,僅在緩沖區已滿時才刷新緩沖區。最大程度地減少延遲,可以將超時設置為接近0的值(例如5或10 ms)(避免將緩沖區超時設置為0,可能導致嚴重的性能下降)。
4、執行模式(批處理/流式傳輸)
DataStream API支持不同的運行時執行模式,可以根據用例的要求和工作特征從中選擇運行模式。
DataStream API有“經典”執行行為,稱之為 STREAMING
執行模式。應用於需要連續增量處理且有望無限期保持在線狀態的無限制作業。另外,有一個批處理式執行模式,我們稱為BATCH
執行模式。這以一種類似於批處理框架(如MapReduce)的方式執行作業。應用於具有已知固定輸入且不會連續運行的有邊界作業。
Apache Flink的流和批處理的統一方法意味着,無論配置的執行模式如何,在有界輸入上執行的DataStream應用程序都將產生相同的最終結果。重要的是要注意final在這里意味着什么:以STREAMING
模式執行的作業可能會產生增量更新(請考慮數據庫中的upsert),而BATCH
作業最后只會產生一個最終結果。如果正確解釋,最終結果將是相同的,但到達那里的方式可能會有所不同。
通過啟用BATCH
執行,我們允許Flink應用其他優化,只有當我們知道輸入是有限的時,我們才能做這些優化。
(1)配置BATCH執行模式
可以通過execution.runtime-mode
設置配置執行模式。有三個可能的值:
STREAMING
:經典的DataStream執行模式(默認)BATCH
:在DataStream API上以批處理方式執行AUTOMATIC
:讓系統根據源的有界性來決定
可以通過的命令行參數進行配置bin/flink run ...
,也可以在創建/配置時以編程方式進行配置StreamExecutionEnvironment
。
通過命令行配置執行模式的方法如下:
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
此示例說明如何在代碼中配置執行模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH);
5、Keyed DataStream(鍵控數據流)
(1)概念:
在flink中數據集為DataStream,對其進行分區時,會產生一個KeyedDataStream,然后允許使用Keyed DataStream的operator以及特有的state(如mapstate、valuestate等),keyby可以通過列下標選擇使用列,也可以選擇使用列名進行分區。
eg:
// some ordinary POJO
public class WC { public String word; public int count; public String getWord() { return word; } } DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words .keyBy(WC::getWord);
三、Flink DataSet API
Flink中的DataSet程序是常規程序,可對數據集進行轉換(例如,過濾,映射,聯接,分組)。最初從某些來源(例如,通過讀取文件或從本地集合)創建數據集。結果通過接收器返回,接收器可以例如將數據寫入(分布式)文件或標准輸出(例如命令行終端)。Flink程序可以在各種上下文中運行,獨立運行或嵌入其他程序中。執行可以在本地JVM或許多計算機的群集中進行。
public class WordCountExample { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?"); DataSet<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .groupBy(0) .sum(1); wordCounts.print(); } public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { for (String word : line.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
1、DataSet轉換
(1)數據轉換將一個或多個數據集轉換為新的數據集。程序可以將多種轉換組合成復雜的程序集。
Transformation | Description |
---|---|
Map | 一個輸入數據對應一個輸出數據,eg:將string轉換成integer類型返回data.map(new MapFunction<String, Integer>() { public Integer map(String value) { |
FlatMap | 一個輸入數據對應多個輸出數據,eg:將string分割成字符,通過collector以流的形式源源不斷輸出data.flatMap(new FlatMapFunction<String, String>() { public void flatMap(String value, Collector<String> out) { for (String s : value.split(" ")) { out.collect(s); } } }); |
Filter | 過濾數據,保留過濾結果為true的部分。data.filter(new FilterFunction<Integer>() { public boolean filter(Integer value) { |
Reduce | 將一組數據合並為一個data.reduce(new ReduceFunction<Integer> { public Integer reduce(Integer a, Integer b) { |
Aggregate | Aggregates a group of values into a single value. Aggregation functions can be thought of as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped data set. You can also use short-hand syntax for minimum, maximum, and sum aggregations. |
Distinct | Returns the distinct elements of a data set. It removes the duplicate entries from the input DataSet, with respect to all fields of the elements, or a subset of fields. Distinct is implemented using a reduce function. You can specify the way that the runtime executes the combine phase of the reduce by supplying a CombineHint to setCombineHint . The hash-based strategy should be faster in most cases, especially if the number of different keys is small compared to the number of input elements (eg. 1/10). |
Join | Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys. You can specify the way that the runtime executes the join via Join Hints. The hints describe whether the join happens through partitioning or broadcasting, and whether it uses a sort-based or a hash-based algorithm. Please refer to the Transformations Guide for a list of possible hints and an example.If no hint is specified, the system will try to make an estimate of the input sizes and pick the best strategy according to those estimates. Note that the join transformation works only for equi-joins. Other join types need to be expressed using OuterJoin or CoGroup. |
OuterJoin | Performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pairs of elements (or one element and a null value for the other input) are given to a JoinFunction to turn the pair of elements into a single element, or to a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys. |
Cross | Builds the Cartesian product (cross product) of two inputs, creating all pairs of elements. Optionally uses a CrossFunction to turn the pair of elements into a single element Note: Cross is potentially a very compute-intensive operation which can challenge even large compute clusters! It is advised to hint the system with the DataSet sizes by using crossWithTiny() and crossWithHuge(). |
Union | Produces the union of two data sets. |
First-n | Returns the first n (arbitrary) elements of a data set. First-n can be applied on a regular data set, a grouped data set, or a grouped-sorted data set. Grouping keys can be specified as key-selector functions or field position keys. |
(2)在元組的數據集上可以進行以下轉換:
Transformation | Description |
---|---|
Project | Selects a subset of fields from the tuples |
MinBy / MaxBy | Selects a tuple from a group of tuples whose values of one or more fields are minimum (maximum). The fields which are used for comparison must be valid key fields, i.e., comparable. If multiple tuples have minimum (maximum) field values, an arbitrary tuple of these tuples is returned. MinBy (MaxBy) may be applied on a full data set or a grouped data set. |
(3)指定key
某些轉換(join,coGroup,groupBy)要求在元素集合上定義鍵。其他轉換(Reduce,GroupReduce,Aggregate)允許在應用數據之前對數據進行分組。
DataSet<...> input = // [...]
DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/);
Flink的數據模型不是基於鍵值對。因此,無需將數據集類型實際打包到鍵和值中。key是“虛擬的”,定義為對實際數據的功能,用於分組操作。
(4)用戶定義的功能
實現接口方式
class MyMapFunction implements MapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } }; data.map(new MyMapFunction());
匿名類方式
data.map(new MapFunction<String, Integer> () { public Integer map(String value) { return Integer.parseInt(value); } });
Java 8 Lambdas(Flink在Java API中還支持Java 8 Lambda)
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
所有需要用戶定義函數的轉換都可以將Rich()函數作為參數。
eg:
class MyMapFunction implements MapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } };
可以替換為以下寫法
class MyMapFunction extends RichMapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } };
將函數照常傳遞給map
轉換:
data.map(new MyMapFunction());
也可以定義為匿名類:
data.map (new RichMapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } });
accumulators
首先,在要使用它的用戶定義的轉換函數中創建一個累加器對象(此處是一個計數器)。
private IntCounter numLines = new IntCounter();
其次,在rich函數的open()
方法中 注冊累加器對象。您還可以在此處定義名稱。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
在運算符函數中的任何位置(包括open()
和 close()
方法中)使用累加器。
this.numLines.add(1);
結果將存儲在JobExecutionResult
從execute()
執行環境的方法返回的對象中(僅在執行等待作業完成時才起作用)。
myJobExecutionResult.getAccumulatorResult("num-lines")
所有累加器為每個作業共享一個名稱空間。因此,可以在作業的不同操作功能中使用同一累加器。Flink將在內部合並所有具有相同名稱的累加器。
注:累加器的結果僅在整個作業結束后才可用。
自定義累加器:
要實現自己的累加器,只需要編寫累加器接口的實現即可。
若自定義累加器應隨Flink一起提供,則可以隨意創建拉取請求,可以選擇實現 Accumulator 或SimpleAccumulator。
1)Accumulator<V,R>
最靈活:它定義V
要添加的值的類型,並定義R
最終結果的結果類型。例如,對於直方圖,V
是一個數字,並且R
是一個直方圖。
2)SimpleAccumulator
適用於兩種類型相同的情況,例如計數器。
三、Operators
將一個或多個DataStream轉換為新的DataStream。程序可以將多種轉換組合成復雜的數據流拓撲。
1、DataStream Transformations
Transformation | Description |
---|---|
MapDataStream → DataStream |
輸入一個數據返回一個數據,eg:對輸入數據*2返回DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); |
FlatMapDataStream → DataStream |
數據一個數據以數據流的形式返回,eg:將字符串拆分為字符,通過collector以流的形式返回dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } }); |
FilterDataStream → DataStream |
過濾數據,保留結果為true的數據dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } }); |
KeyByDataStream → KeyedStream |
將dataStream轉換為keyedStream,原理:在邏輯上將流划分為不相交的分區,具有相同key的記錄都分配到同一個分區; 在內部,keyBy() 是通過哈希分區實現的。 dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey" dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple
|
ReduceKeyedStream → DataStream |
減少keyedStream的滾動,將上一條reduced’的數據和當前數據合並,產生新的結果。keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } }); |
AggregationsKeyedStream → DataStream |
聚合函數keyedStream.sum(0); keyedStream.sum("key"); keyedStream.min(0); keyedStream.min("key"); keyedStream.max(0); keyedStream.max("key"); keyedStream.minBy(0); keyedStream.minBy("key"); keyedStream.maxBy(0); keyedStream.maxBy("key"); |
WindowKeyedStream → WindowedStream |
在已經分區的keyedStream上定義windows,windows根據某些特征對每個key中的數據進行分組。dataStream.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data |
WindowAllDataStream → AllWindowedStream |
在普通數據流上定義windows,windows根據某些特征對所有流事件進行分組(非並行轉換操作,所有記錄合並成一個任務進行windowAll操作)dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data |
Window ApplyWindowedStream → DataStreamAllWindowedStream → DataStream |
將通用函數應用於整個窗口,eg:手動對窗口元素求和。windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); |
Window ReduceWindowedStream → DataStream |
window上執行reduce()函數windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } }); |
Aggregations on windowsWindowedStream → DataStream |
聚合函數windowedStream.sum(0); windowedStream.sum("key"); windowedStream.min(0); windowedStream.min("key"); windowedStream.max(0); windowedStream.max("key"); windowedStream.minBy(0); windowedStream.minBy("key"); windowedStream.maxBy(0); windowedStream.maxBy("key"); |
UnionDataStream* → DataStream |
合並 data streams(如果dataStream和自身合並,則在結果中會出現兩次) |
Window JoinDataStream,DataStream → DataStream |
Join two data streams on a given key and a common window. |
Interval JoinKeyedStream,KeyedStream → DataStream |
Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound |
Window CoGroupDataStream,DataStream → DataStream |
Cogroups two data streams on a given key and a common window. |
ConnectDataStream,DataStream → ConnectedStreams |
"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams. |
CoMap, CoFlatMapConnectedStreams → DataStream |
Similar to map and flatMap on a connected data stream |
IterateDataStream → IterativeStream → DataStream |
Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. |
以下轉換用於tuples的dataStream上:
Transformation | Description |
---|---|
ProjectDataStream → DataStream |
Selects a subset of fields from the tuples |
2、物理分區
Flink還通過以下功能對轉換后的確切流分區進行了底層控制。
Transformation | Description |
---|---|
Custom partitioningDataStream → DataStream |
Uses a user-defined Partitioner to select the target task for each element.
|
Random partitioningDataStream → DataStream |
Partitions elements randomly according to a uniform distribution. |
Rebalancing (Round-robin partitioning)DataStream → DataStream |
Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew. |
BroadcastingDataStream → DataStream |
Broadcasts elements to every partition. |
3、任務鏈和資源組
鏈接兩個后續的轉換意味着將它們共同定位在同一線程內以獲得更好的性能。默認情況下Flink會鏈接運算符(例如,兩個后續的映射轉換),API可以對鏈接進行細粒度的控制。如果要在整個作業中禁用鏈接,使用StreamExecutionEnvironment.disableOperatorChaining()。
備注:這些函數只能在DataStream轉換后使用,因為它們引用的是先前的轉換。例如,可以使用someStream.map(...).startNewChain(),但不能使用someStream.startNewChain()。
資源組是Flink中的slot。
Transformation | Description |
---|---|
Start new chain | Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper. |
Disable chaining | Do not chain the map operator |
Set slot sharing group | Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default"). |
4、Windows
Windows是處理無限流的核心。Windows將流分成有限大小的“存儲桶”。
(0)窗口化Flink程序的一般結構如下所示:
Keyed Windows(鍵控流)
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Non-Keyed Windows(非鍵控流)
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
在上面,方括號([…])中的命令是可選的。Flink允許以多種不同方式自定義窗口邏輯,使其最切合需求。
(1)Window Assigners(窗口分配器)
窗口分配器定義了如何將元素分配給窗口,WindowAssigner
在window(...)
(鍵控流)或windowAll()
(非鍵控流)函數中調用。WindowAssigner
負責將每個傳入元素分配給一個或多個窗口。Flink 為最常見的用例提供了預定義的窗口分配器,即滾動窗口、 滑動窗口、會話窗口和全局窗口,還可以通過擴展WindowAssigner
類來實現自定義窗口分配器。所有內置窗口分配器(全局窗口除外)都根據時間將元素分配給窗口,時間可以是處理時間或事件時間。
基於時間的窗口通過開始時間戳(包括)和一個結束時間戳(不包括)表示窗口大小。在代碼中,FlinkTimeWindow
在處理基於時間窗口時使用,該窗口具有查詢開始和結束時間戳的方法maxTimestamp()
,以及返回給定窗口的最大允許時間戳。
下圖可視化描述每個分配器的工作原理。紫色圓圈代表流的元素,它們由某個key(在本例中為user 1、user 2和user 3)分區。x 軸顯示時間的進展。
(1.1)翻滾視窗
翻滾視窗分配器分配每一個元素到固定大小的窗口(滾動窗口具有固定的大小且不重疊)。
eg:如果指定大小為5分鍾的翻滾窗口,從當前窗口開始計算,每五分鍾將啟動一個新窗口,如下圖所示
代碼示例:
DataStream<T> input = ...; // tumbling event-time windows
input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // tumbling processing-time windows
input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // daily tumbling event-time windows offset by -8 hours.
input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>);
時間間隔可以通過Time.milliseconds(x)
,Time.seconds(x)
, Time.minutes(x)
等添加。如上面最后一個示例,滾動窗口分配器采用offset(可選)
參數,用於更改窗口的對齊方式。若沒有offsets ,則時間滾動窗口與epoch對齊,即1:00:00.000 - 1:59:59.999
,2:00:00.000 - 2:59:59.999等;如果offset設置為
15分鍾,則得到如 1:15:00.000 - 2:14:59.999
,2:15:00.000 - 3:14:59.999
等,即offset可以用來調整窗口時區為UTC-0以外的時區,如采用中國時區,必須指定的偏移量Time.hours(-8)
。
(1.2)滑動窗
類似於滾動窗口分配器,窗口的大小由窗口大小參數配置,同時窗口滑動參數控制滑動窗口啟動的頻率。因此,如果slide參數小於窗口大小,則滑動窗口可能會重疊。在這種情況下,元素被分配給多個窗口。
例如,可以將大小為10分鍾的窗口滑動5分鍾。這樣,每隔5分鍾就會得到一個窗口,其中包含最近10分鍾內到達的事件,如下圖所示:
DataStream<T> input = ...; // sliding event-time windows
input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>); // sliding processing-time windows
input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>); // sliding processing-time windows offset by -8 hours
input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>);
(1.3)會話窗口
與滾動窗口和滑動窗口相比,會話窗口不重疊且沒有固定的開始和結束時間。相反,當會話窗口在一定時間段內未收到元素時(即不活動間隙),它將關閉。會話窗口分配器可與靜態配置會話間隙或與會話間隙函數(指定不活動周期)使用。當此時間段到期時,當前會話將關閉,隨后的元素將分配給新的會話窗口。
DataStream<T> input = ...; // event-time session windows with static gap
input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // event-time session windows with dynamic gap
input .keyBy(<key selector>) .window(EventTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap
})) .<windowed transformation>(<window function>); // processing-time session windows with static gap
input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // processing-time session windows with dynamic gap
input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap
})) .<windowed transformation>(<window function>);
靜態間隙可以通過Time.milliseconds(x)
,Time.seconds(x)
, Time.minutes(x)
等設置。動態間隙是通過實現SessionWindowTimeGapExtractor
接口指定的。
注意由於會話窗口沒有固定的開始和結束,在內部,會話窗口運算符會為每個到達的記錄創建一個新窗口,如果窗口彼此之間比已定義的間隔小,則將它們進行merge操作。(merge操作,會話窗口操作需要一個merge觸發器以及merge的window函數,如ReduceFunction
,AggregateFunction
或ProcessWindowFunction)
(1.4)全局窗口
全局窗口分配器對同單個窗口分配相同的key元素。當指定了自定義觸發器時,此窗口schema才有用。否則,將不會執行任何計算,因為當執行聚合函數時,全局窗口不會自動結束。
DataStream<T> input = ...; input .keyBy(<key selector>) .window(GlobalWindows.create()) .<windowed transformation>(<window function>);
2、Window Functions
(2.1)Reduce功能
ReduceFunction
指定如何將輸入中的兩個元素組合在一起以產生相同類型的輸出元素。Flink使用aReduceFunction
來逐步聚合窗口的元素。
eg:
DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .reduce(new ReduceFunction<Tuple2<String, Long>> { public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) { return new Tuple2<>(v1.f0, v1.f1 + v2.f1); } });
(2.2)聚合函數
一個AggregateFunction
是一個一般化版本ReduceFunction
,其具有三種類型:輸入類型(IN
),蓄壓式(ACC
),和一個輸出類型(OUT
)。輸入類型是輸入流中元素的類型,並且AggregateFunction
具有將一個輸入元素添加到累加器的方法。該接口還具有創建初始累加器,將兩個累加器合並為一個累加器以及OUT
從累加器提取輸出(類型)的方法。我們將在下面的示例中看到它的工作原理。
與ReduceFunction一樣
,Flink將在窗口輸入元素到達時增量地聚合它們。
一個AggregateFunction
可以被定義並這樣使用:
/** * The accumulator is used to keep a running sum and a count. The {@code getResult} method * computes the average. */
private static class AverageAggregate implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> { @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L); } @Override public Double getResult(Tuple2<Long, Long> accumulator) { return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } } DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .aggregate(new AverageAggregate());
(2.3)ProcessWindowFunction
ProcessWindowFunction獲得一個Iterable,該Iterable包含窗口的所有元素,以及一個可以訪問時間和狀態信息的Context對象,這使其能夠比其他窗口函數提供更大的靈活性。這是以性能和資源消耗為代價的,因為不能增量聚合元素,而是需要在內部對其進行緩沖,直到認為該窗口已准備好進行處理為止。
ProcessWindowFunction
:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param context The context in which the window is being evaluated. * @param elements The elements in the window being evaluated. * @param out A collector for emitting elements. * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */
public abstract void process( KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; /** * The context holding window metadata. */
public abstract class Context implements java.io.Serializable { /** * Returns the window that is being evaluated. */
public abstract W window(); /** Returns the current processing time. */
public abstract long currentProcessingTime(); /** Returns the current event-time watermark. */
public abstract long currentWatermark(); /** * State accessor for per-key and per-window state. * * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up * by implementing {@link ProcessWindowFunction#clear(Context)}. */
public abstract KeyedStateStore windowState(); /** * State accessor for per-key global state. */
public abstract KeyedStateStore globalState(); } }
注意:Tuple
必須手動將其強制轉換為正確大小的元組以提取key字段。
ProcessWindowFunction
可以定義成這樣使用(ProcessWindowFunction
計算窗口中元素的方法):
DataStream<Tuple2<String, Long>> input = ...; input .keyBy(t -> t.f0) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .process(new MyProcessWindowFunction()); /* ... */
public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> { @Override public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) { long count = 0; for (Tuple2<String, Long> in: input) { count++; } out.collect("Window: " + context.window() + "count: " + count); } }
注意將ProcessWindowFunction
簡單的聚合(例如count)效率很低。
(2.4)具有增量聚合的ProcessWindowFunction
ProcessWindowFunction
可與一組合ReduceFunction
,或AggregateFunction
因為它們在窗口到達逐步聚合的元件。窗口關閉后,ProcessWindowFunction
將提供匯總結果。這樣一來,它便可以遞增地計算窗口,同時可以訪問的其他窗口元信息ProcessWindowFunction
。
注意也可以使用舊版WindowFunction
而不是 ProcessWindowFunction
用於增量窗口聚合。
具有ReduceFunction的增量窗口聚合
eg:如何將增量ReduceFunction
與ProcessWindowFunction組合
以返回窗口中的最小事件以及該窗口的開始時間:
DataStream<SensorReading> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .reduce(new MyReduceFunction(), new MyProcessWindowFunction()); // Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> { public SensorReading reduce(SensorReading r1, SensorReading r2) { return r1.value() > r2.value() ? r2 : r1; } } private static class MyProcessWindowFunction extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> { public void process(String key, Context context, Iterable<SensorReading> minReadings, Collector<Tuple2<Long, SensorReading>> out) { SensorReading min = minReadings.iterator().next(); out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min)); } }
具有AggregateFunction的增量窗口聚合
eg:如何將增量AggregateFunction
與一個組合ProcessWindowFunction
以計算平均值,並與平均值一起發出鍵和窗口:
DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .aggregate(new AverageAggregate(), new MyProcessWindowFunction()); // Function definitions
/** * The accumulator is used to keep a running sum and a count. The {@code getResult} method * computes the average. */
private static class AverageAggregate implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> { @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L); } @Override public Double getResult(Tuple2<Long, Long> accumulator) { return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } } private static class MyProcessWindowFunction extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> { public void process(String key, Context context, Iterable<Double> averages, Collector<Tuple2<String, Double>> out) { Double average = averages.iterator().next(); out.collect(new Tuple2<>(key, average)); } }
3、Triggers
Trigger
確定窗口(由窗口分配器形成)何時准備好由窗口函數處理。每個WindowAssigner
都有一個默認值Trigger
。如果默認觸發器不符合您的需求,則可以使用指定自定義觸發器trigger(...)
。
觸發器接口具有五種方法,它們允許aTrigger
對不同事件做出反應:
-
onElement()
對於添加到窗口中的每個元素,都會調用該方法。 -
onEventTime()
當注冊的事件時間計時器觸發時,將調用該方法。 -
onProcessingTime()
當注冊的處理時間計時器觸發時,將調用該方法。 -
該
onMerge()
方法與有狀態觸發器相關,並且在兩個觸發器的相應窗口合並時(例如,在使用會話窗口時)合並兩個觸發器的狀態。 -
最后,該
clear()
方法執行刪除相應窗口后所需的任何操作。
關於上述方法,需要注意兩件事:
1)前三個通過返回來決定如何對它們的調用事件采取行動TriggerResult
。該動作可以是以下之一:
CONTINUE
: 沒做什么FIRE
:觸發計算PURGE
:清除窗口中的元素FIRE_AND_PURGE
:觸發計算並隨后清除窗口中的元素。
2)這些方法中的任何一種均可用於以后operator注冊(處理或時間)事件計時器。
4、允許遲到
(1)定義
當使用事件時間窗口時,元素可能會延遲到達,即Flink 用來跟蹤事件時間進度的水印已經超過了元素所屬窗口的結束時間戳。默認情況下,當水印超過窗口末尾時,將刪除后期元素。但是,Flink 允許為窗口操作符指定最大允許延遲。Allowed lateness 指定元素在被丟棄之前可以延遲多長時間,其默認值為 0。 在 watermark 已經通過窗口末尾之后但在它通過窗口末尾之前到達的元素加上允許的延遲,仍然添加到窗口中。根據使用的觸發器,延遲但未丟棄的元素可能會導致窗口再次觸發。對於EventTimeTrigger
.為了完成這項工作,Flink 會保持窗口的狀態,直到它們允許的延遲到期。一旦發生這種情況,Flink 將移除窗口並刪除其狀態。
默認情況下,允許的延遲設置為 0,即
到達水印之后的元素將被丟棄。
允許的延遲代碼示例:
DataStream<T> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .<windowed transformation>(<window function>);
注意:當使用GlobalWindows
窗口分配器時,沒有數據被認為是延遲的,因為全局窗口的結束時間戳是Long.MAX_VALUE
。
(2)獲取延遲數據作為側流輸出
使用Flink的側流輸出功能,可以獲取最近被丟棄的數據流。
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){}; DataStream<T> input = ...; SingleOutputStreamOperator<T> result = input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .sideOutputLateData(lateOutputTag) .<windowed transformation>(<window function>); DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
5、狀態大小注意事項:
Windows可以定義很長時間(例如幾天,幾周或幾個月),因此會累積很大的狀態。在估算窗口計算的存儲需求時,有以下規則:
-
Flink為每個窗口所屬的每個元素創建一個副本。鑒於此,滾動窗口保留每個元素的一個副本(一個元素恰好屬於一個窗口,除非被延遲放置)。相反,滑動窗口會為每個元素創建多個。因此,並不推薦大小為1天的滑動窗口和滑動1秒的滑動窗口。
-
ReduceFunction和
AggregateFunction極大地減少了存儲需求,因為它們聚合了元素,且每個窗口僅存儲一個值。相反,使用ProcessWindowFunction
需要累積所有元素。 -
使用
Evictor
防止了任何預聚合,作為窗口的所有元件必須通過evictor()施加的計算。
6、WindowAssigners 的默認觸發器
默認Trigger
的WindowAssigner
是適用於很多情況。例如,所有事件時間窗口分配器都有一個EventTimeTrigger作為
默認觸發器。一旦水印通過窗口的末尾,這個觸發器就會觸發。
注意:
(1)GlobalWindow默認觸發器是NeverTrigger(
從不觸發),使用全局窗口時需要自定義一個觸發器。
(2)使用指定觸發器trigger(),
將覆蓋WindowAssigner的默認觸發器
。
四、Side Outputs(側面輸出流)
除了DataStream
操作產生的主流之外,還可以附加產生任意數量的側面輸出流。側流中的數據類型不必與主流中的數據類型匹配,並且不同側輸出的類型也可以不同。拆分數據流時,通常必須復制該流,然后從每個流中過濾掉不需要的數據。
定義一個OutputTag
用於標識側面輸出流的:
// 這需要是一個匿名的內部類,以便我們分析類型 OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
可以通過以下功能將數據發送到側面輸出:
-
過程功能
-
KeyedProcessFunction
-
協同處理功能
-
KeyedCoProcessFunction
-
ProcessWindowFunction
-
ProcessAllWindowFunction
使用上述方法中向用戶暴露Context
參數,將數據發送到由 OutputTag
標識的側流中。
eg:從 ProcessFunction
發送數據到側流輸出
DataStream<Integer> input = ...; final OutputTag<String> outputTag = new OutputTag<String>("side-output"){}; SingleOutputStreamOperator<Integer> mainDataStream = input .process(new ProcessFunction<Integer, Integer>() { @Override public void processElement( Integer value, Context ctx, Collector<Integer> out) throws Exception { // emit data to regular output
out.collect(value); // emit data to side output
ctx.output(outputTag, "sideout-" + String.valueOf(value)); } });
在 DataStream
運算結果上使用 getSideOutput(OutputTag)
方法獲取旁路輸出流,會產生一個與側面輸出流結果類型一致的 DataStream。
eg:
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){}; SingleOutputStreamOperator<Integer> mainDataStream = ...; DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
flink官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/concepts/index.html
借鑒了不少文章,感謝各路大神分享,如需轉載請注明出處,謝謝:https://www.cnblogs.com/huyangshu-fs/p/14489114.html