- 特點
- Use Case
- Flink (最新 1.10 版本) vs Spark (最新 2.4.5)
- 架構
- 運行模式
- Layered APIs & Component Stack
- DataStream 例子
- DataSet 例子
- 狀態
- Time、Watermark、Late Data
- Windows
- Checkpoint
- DataStream 的 Sources、Transformations、Sinks
- DataStream 的 Join
- Process Function
- 異步 IO
- DataSet 的 Sources、Transformation、Sinks
- Table API & SQL
- Library CEP
- Library Gelly
特點
- 分布式並行計算框架
- 強大的狀態管理,支持有狀態的計算
- 支持流數據處理、和批數據處理,以流數據為基礎,批處理是流處理的特例
- 支持基於數據自身攜帶的 event-time 進行處理,支持 Watermark 機制
- 提供基於時間、數量、會話的強大的 window 機制
- 提供精准一致語義 (exactly-once) 的保證機制
- 提供 checkpoint、savepoint,支持錯誤恢復、升級、伸縮等機制
- 提供流處理 DataStream API 和批處理 DataSet API,在此之上提供 SQL、Table API 統一流批接口
- 基於 Java 實現,提供 Scala 支持,在 Table API、SQL 層面提供 Python 支持
- 基於內存的計算、低延遲、高吞吐、高可用、可擴展、支持數千核心、支持 TB 級別的狀態管理
- 多種部署方式:Standalone Cluster、Yarn、Mesos、K8S
Use Case
- Event-Driven Applications:事件驅動程序,可以在收到一個或多個數據后實時做出響應,不需要等待 (相比較 Spark 就做不到,Spark 需要等待收到一批數據后才能響應,所以 Spark 的實時性不夠),適用於 Fraud detection、Anomaly detection、Rule-based alerting、monitoring 等有實時響應要求的應用
- Data Analytics Applications:對有邊界數據做批數據分析、對無邊界數據做流數據分析 (實時地、持續地、增量地,獲取數據、分析數據、更新結果),Flink 提供 SQL 接口統一對批數據和流數據的操作
- Data Pipeline Applications:類似 ETL,做數據的清理、擴展、入庫,區別在於傳統 ETL 是周期性啟動程序,Flink 可以是一個程序在持續運行
Flink (最新 1.10 版本) vs Spark (最新 2.4.5)
- Flink 優點:有狀態管理 ( Spark 只有 checkpoint )、強大的窗口機制 ( Spark 只支持時間窗口,不支持事件窗口、會話窗口)、實時性更強 ( Spark 是通過微批處理,延時比較高,而且無法基於事件實時響應,Flink 原生就是基於數據流/事件流的 )、exactly-once 的實現比 Spark 要好
- Spark 優點:流批統一得更好、批處理能力更強、MachineLearning 投入更多、原生 Scala 支持 Java/Python ( Flink 原生 Java,支持 Scala,只在 Table API、SQL 層面支持 Python,而 Scala 比 Java 簡潔 )
架構
JobManagers
也叫 masters,至少一個,也可以配置多個實現 HA,負責管理集群、調度任務、協調分布式執行、協調 checkpoints 和故障恢復等等,JobManager 有 3 個主要的組件
- ResourceManager:負責 Flink 集群資源的管理和配置,管理 TaskManager 的 task slots,在不同的部署模式下(Standalone、Yarn、Mesos、K8S)會有不同的實現,在 Standalone 模式下,JobManager 無法根據需要自己啟動新的 TaskManager
- Dispatcher:提供 REST 接口用以接受 Client 提交 Flink 程序,同時用於啟動 JobManager 程序,並且提供 WebUI 用於查詢 Job 信息
- JobMaster:用於管理一個 JobGraph 的執行,多個 Job 可以同時運行,每個 Job 都有自己的 JobMaster
TaskManagers
也叫 workers,至少要有一個,用於執行數據流的任務,緩存和交換數據,TaskManagers 需要和 JobManagers 建議鏈接,報告自己的狀態,接受任務的分配
TaskManager 是一個 JVM 進程,管理着一個或多個 task slots,代表 TaskManager 最多能同時接收多少 task,每個 task slot 是一個 thread,每個 slot 會預留相應的內存不會被其他 slot 占據,現在 slot 只預留內存資源,不預留其他資源比如 CPU,同一個 TaskManager 的不同 task 可以共享相同的 TCP 鏈接和心跳信息,也可以共享數據集和數據結構,可以有效減少 overhead
Flink 默認允許不同 tasks 的 subtasks 共享 slots,只要這些 tasks 是來自同一個 Job,這樣有可能管理着一個 Job 的整個 pipeline,這樣可以有效提高資源利用率
Task slots 的數量最好配置成和 CPU 的核心數量一樣
Client
通過 flink run 命令准備並將任務提交給 JobManager,然后就可以退出了,也可以保持鏈接以接受 JobManager 返回的運行狀態
運行模式
- Flink Session Cluster:有一個長期運行的、提前規划好資源的集群,所有 client 向同一個集群提交 Job,缺點是有資源的競爭、JobManager 和 TaskManager 出錯的話會影響所有 Job,優點是啟動快,適合那些頻繁啟動、運行時間短、要求快速啟動響應的程序
- Flink Job Cluster:每個 Job 都有獨立的集群,需要提交 Job 到外部程序比如 Yarn、K8S 等,然后由這些外部程序先分配資源,然后用分配的資源啟動 JobManager,再根據 Job 的需求啟動 TaskManager,再運行程序,優點是每個 Job 都是獨立的互不影響,缺點是啟動時間長、缺乏統一管理,適合那些需要長期運行、對啟動時間不敏感的程序
- Flink Application Cluster:將應用和依賴打包成一個可運行的 Jar 包直接運行,不需要先啟動集群再提交 Job,而是一步到位直接啟動程序
- Self-contained Flink Applications:there are efforts in the community towards fully enabling Flink-as-a-Library in the future.
Layered APIs & Component Stack
Flink 提供了不同 level 的 API 供程序使用
DataStream 例子
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
// env 的初始化和 DataSet 不一樣
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
更多信息參考 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html
DataSet 例子
public class WordCountExample {
public static void main(String[] args) throws Exception {
// env 的初始化和 DataStream 不一樣
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
wordCounts.print();
}
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
更多信息參考 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/
狀態
如果一個流處理程序,需要對多個數據計算才能得出結果,那么通常這個程序就是有狀態的,需要保存某個值,或某個中間結果,比如有個程序用於實時監控司機的駕駛狀態,如果連續駕駛 3 小時就要立刻出告警信息,這就需要保存每個司機的狀態,以司機為 key,保存開始計時的時間,既 (Driver, StartTime),收到新數據后,如果顯示司機沒在駕駛,就更新 StartTime 為當前時間,如果在駕駛,就和 StartTime 比較,如果超過 3 小時,就發告警信息,Flink 會自動幫我們管理並保存這個狀態信息,並且如果程序出錯重啟,能自動恢復這個狀態,這樣能繼續從出錯時讀取的數據開始繼續運行,而不需要回溯歷史數據重新計算,Flink 的狀態管理功能簡化了應用程序的編寫,讓應用程序更專注於業務上
Flink 的狀態管理功能包括:
- Multiple State Primitives:支持多種數據類型的狀態,包括基本數據類型,list 類型,map 類型,等等
- Pluggable State Backends:可插拔的狀態后端,狀態后端用於管理狀態,決定了一個狀態最多能有多大,異步還是同步,用什么方法保存,保存在什么地方,Flink 可以通過配置就能改變所有 Job 的 State Backend,也可以每個 Job 的代碼自己決定,Flink 提供了幾個 State Backend:
1)MemoryStateBackend(默認,用於少量狀態或測試),Checkpoint 候把狀態保存到 JobManager 的內存;
2)FsStateBackend,Checkpoint 時把狀態保存到文件系統;
3)RocksDBStateBackend,運行時和 Checkpoint 時都是把狀態落盤到 RocksDB,只支持異步;
4)用戶自定義的 State Backend
更多信息參考 https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html - Exactly-once state consistency: 通過 Checkpoint 和恢復機制保證狀態的一致性,使失敗處理對程序透明
- Very Large State: 通過異步保存和增量 checkpoint 算法,Flink Job 可以維護 TB 級別的狀態
- Scalable Applications: Flink supports scaling of stateful applications by redistributing the state to more or fewer workers.
Flink 有兩種基本的 State
- Keyed State:和 key 相關的一種 state,只能用於 KeyedStream 類型數據集對應的 functions 和 operators 上,每個 Keyed State 和一個 Operator 和 Key 的組合綁定,Keyed State 通過 Key Groups 組織,Flink 以 Key Groups 為單位重新調度 Keyed State
- Operator State:和 operator 的一個並行實例綁定的 state,Kafka Connector 就是一個使用 Operator State 的例子,Kafka consumer 的每個並行實例有一個 Operator State 用於維護一個 topic partitions 和 offsets 的 map
State 以兩種形式存在
- 托管:就是數據結構是 Flink 知道的,比如 list,可以完全交給 Flink 管理
- 原生:就是數據結構是自己定義的,Flink 不清楚,只當成二進制數據,需要 operator 做進一步處理
Flink 能管理的類型包括:
- ValueState
- ListState
- ReducingState
- AggregatingState<IN, OUT>
- FoldingState<T, ACC>
- MapState<UK, UV>
更多信息參考 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html
KeyedState 例子
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
Tuple2<Long, Long> currentSum = sum.value();
currentSum.f0 += 1;
currentSum.f1 += input.f1;
sum.update(currentSum);
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
// 第一個是 (3+5)/2,第二個是 (7+4)/2,接下來只剩一個不會觸發
// 這個例子都是相同的 key,如果是不同的 key 會按 key 聚合,sum 會自動和 key 關聯
Operator State 例子(通過 state 使得 sink operator 實現 offset 錯誤恢復機制)
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
// 和 key state 的差別,這里調用 getOperatorStateStore,初始化方式不一樣,而 state 的定義可以一樣
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}
State-Backend 可以在 flink-conf.yaml 設置
# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
也可以在每個 Job 設置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
State 還可以設置 TTL,超時的 State 會被清除
Time、Watermark、Late Data
Flink 流處理程序支持基於不同類型的時間進行操作(比如 Window 操作)
- Event Time:基於數據自身攜待的時間
- Ingestion Time:基於數據被 Flink 收到時的系統時間
- Processing Time(默認):基於程序執行操作時的系統時間
指定時間類型
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
stream.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
沒指定的話默認是 Processing Time
基於窗口的運算中,可能會出現數據的亂序和延遲,即某個窗口結束后,依然有屬於該窗口的數據到來,Flink 通過 Watermarking (水印)指定最多可容忍多久的延遲、控制觸發窗口計算的時機
Flink 可以自定義時間字段,以及自定義 Watermark 時間,Flink 觸發窗口計算的條件:
- Watermark 時間 >= 窗口的結束時間
- 窗口的時間范圍內有數據
比如窗口大小是 10 分鍾,那么(0,10)這個窗口在 Watermark 時間大於 10 的時候觸發
Flink 提供了統一的 DataStream.assignTimestampsAndWatermarks() 方法提取事件時間並產生 Watermark
assignTimestampsAndWatermarks() 接受的類型有
- AssignerWithPeriodicWatermarks
- AssignerWithPunctuatedWatermarks
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
// MyTimestampsAndWatermarks 是自己實現的 AssignerWithPeriodicWatermarks 或 AssignerWithPunctuatedWatermarks 的子類
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
AssignerWithPeriodicWatermarks 是周期性產生 Watermarks
默認周期 200ms,通過ExecutionConfig.setAutoWatermarkInterval() 可以指定新的周期
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(200);
每次產生 Watermarks 的時候是通過調用 AssignerWithPeriodicWatermarks 的 getCurrentWatermark() 函數
時間的獲取則是通過 AssignerWithPeriodicWatermarks 的 extractTimestamp 函數實現
// 這個自定義的類,將所有收到的數據所攜待的時間的最大值,減去 maxOutOfOrderness 作為 Watermark
// 假設窗口大小是 10 分鍾,maxOutOfOrderness 是 3 分鍾,那么
// (0,10)窗口需要在收到的數據時間大於等於 13 的時候才觸發
public class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
// 每來一條數據就解析數據的時間,用以更新 Watermark,其中 MyEvent 是自定義的數據類
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// getCurrentWatermark 被周期性的調用
// 如果返回的 Watermark 大於窗口的結束時間,那就觸發窗口的計算
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
Flink 提供了幾個內置的 AssignerWithPeriodicWatermarks 類
// AscendingTimestampExtractor
// 新數據的時間變大就直接將該時間作為 Watermark,否則進行異常處理
// 需要自定義 extractAscendingTimestamp 函數獲取時間
public abstract long extractAscendingTimestamp(T element);
@Override
public final long extractTimestamp(T element, long elementPrevTimestamp) {
final long newTimestamp = extractAscendingTimestamp(element);
if (newTimestamp >= this.currentTimestamp) {
this.currentTimestamp = newTimestamp;
return newTimestamp;
} else {
violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
return newTimestamp;
}
}
@Override
public final Watermark getCurrentWatermark() {
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}
// BoundedOutOfOrdernessTimestampExtractor
// 新數據的時間變大,就在該時間的基礎上減去一個閥值 maxOutOfOrderness 作為 Watermark
// 需要自定義 extractTimestamp 獲取數據時間
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException("Tried to set the maximum allowed " +
"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
}
public abstract long extractTimestamp(T element);
@Override
public final Watermark getCurrentWatermark() {
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
// IngestionTimeExtractor
// 基於系統時間生成 Watermark
@Override
public long extractTimestamp(T element, long previousElementTimestamp) {
final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
maxTimestamp = now;
return now;
}
@Override
public Watermark getCurrentWatermark() {
final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
maxTimestamp = now;
return new Watermark(now - 1);
}
AssignerWithPunctuatedWatermarks 是打點型 Watermark,就是每次收到數據,都會判斷是不是要立刻產生 Watermark
public class MyTimestampsAndWatermarks implements AssignerWithPunctuatedWatermarks<MyEvent> {
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
// checkAndGetNextWatermark 會在 extractTimestamp 之后被立刻調用
return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
}
打點型 Watermark 沒有內置類
如果 Window 的數據源有多個,每個都有自己的 Watermark,那么會選取最小的那個
如果希望處理在窗口被 Watermark 觸發后才到來的數據,一般有兩種方法
- Allowed Lateness
- Side Outputs
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(Time.seconds(30))
.<windowed transformation>(<window function>);
allowedLateness(Time.seconds(30)) 會在窗口計算結束后,依然保留 30s 不銷毀,這段時間內如果有屬於這個窗口的數據進來,還可以進行計算
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(Time.seconds(30))
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
sideOutputLateData(lateOutputTag) 將窗口結束后才到來的數據進行分流,可以對其單獨處理
Debugging Watermarks:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_event_time.html
Windows
大體上有兩種 Windows
Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
可以看到兩者的主要區別在於 .keyBy.window 還是 .windowAll
對於 keyed streams,計算由多個並行的 task 執行,相同的 key 的數據會被同一個 task 計算
對於 non-keyed streams,所有的計算由一個單獨的 task 執行
Window Assigners 決定了如何將數據分配給 Window
Flink 定義了幾種常用的窗口機制:tumbling windows,sliding windows,session windows,global windows
除了 global windows 其他幾種 Windows 都是基於時間,可以是 processing time 或者 event time
Flink 也運行用戶通過繼承 WindowAssigner 類自定義窗口機制
Tumbling Windows:翻滾窗口,固定窗口大小,且窗口沒有重疊
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) // 用於調整時區
.<windowed transformation>(<window function>);
Sliding Windows:滑動窗口,窗口大小固定,可以有重疊,比如大小 20 滑動距離 5 的兩個窗口(0,20)和(5,25)
DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
Session Windows:沒有固定的窗口大小,當一定時間內沒收到新數據時,就將之前收到的數據作為一個窗口
DataStream<T> input = ...;
// event-time session windows with static gap(10 分鍾內沒收到數據就觸發窗口)
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gap(自定義窗口間隔時間)
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gap(10 分鍾內沒收到數據就觸發窗口)
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gap(自定義窗口間隔時間)
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
Global Windows:將相同 key 的數據作為一個窗口,需要自定義窗口觸發機制 Trigger(比如收多少個數據后觸發)
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(size))
.<windowed transformation>(<window function>);
Flink 還提供了 timeWindow 和 countWindow 函數做了封裝
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingEventTimeWindows.of(size));
}
}
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
}
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
如果有特殊要求就自定義 WindowAssigner 子類
Triggers 決定了 Windows 如何被觸發
每個 Trigger 主要有下面幾個函數
- onElement:有數據添加到窗口時執行
- onEventTime:觸發 event time 計時器時執行
- onProcessingTime:觸發 processing time 計時器時執行
- onMerge:窗口合並時執行
- clear:清理窗口時執行
前三個函數返回 TriggerResult 告知 Flink 需要對窗口執行什么操作,主要有
- CONTINUE:什么都不用做
- FIRE:觸發窗口計算
- PURGE:清理窗口數據
- FIRE_AND_PURGE:觸發並清理窗口
內置的 Trigger 都只觸發 FIRE 不會觸發 PURGE
Flink 內置的 Triggers
- EventTimeTrigger:將 event-time 作為窗口時間,當 watermark 超過窗口 end time 時觸發
- ProcessingTimeTrigger:將 processing-time 作為窗口時間,當 watermark 超過窗口 end time 時觸發
- CountTrigger:窗口內的數據量超過上限時觸發
- PurgingTrigger:Trigger 的包裝類,被包裝的 Trigger 返回 FIRE 時,將返回值改為 FIRE_AND_PURGE
如果有特殊需求可以自定義 Trigger 子類
Window Functions
- ReduceFunction:兩個輸入,產生一個輸出
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
- AggregateFunction:聚合函數,需要定義三個數據,輸入數據類型,中間值類型,輸出值類型
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
// add 函數依據每個輸入數據,更新中間值
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
// getResult 函數依據中間值,計算輸出值
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
// merge 函數合並兩個中間值
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
- FoldFunction:直接依據輸入數據,更新輸出結果
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
// 在前面的 acc 基礎上,根據輸入數據產生新的 acc,並且 acc 會作為最終輸出,acc 的初始值由 fold 函數指定
return acc + value.f1;
}
});
- ProcessWindowFunction:最靈活的方式,可以獲取窗口的所有數據,但更耗性能和資源
DataStream<Tuple2<String, Long>> input = ...;
input.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());
public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
ProcessWindowFunction 還可以和 ReduceFunction、AggregateFunction、FoldFunction 等函數結合
DataStream<SensorReading> input = ...;
input.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
// 窗口結束后,如果只有 reduce 會直接把 reduce 的結果輸出
// 結合 ProcessWindowFunction 后會把 reduce 結果再傳給 ProcessWindowFunction 再進一步處理
// 這里在 reduce 的基礎上把 window 的 start time 加入到輸出
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
有的地方可以用舊版本的 WindowFunction
Evictors
用於在窗口執行之前或窗口執行之后刪除數據,Evictor 類主要由兩個函數實現
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
Flink 有三個內置的 Evictors
- CountEvictor:指定保留最近的多少個數據,其余丟棄,可以參考前面滑動計數窗口 countWindow 的例子
- DeltaEvictor:自定義 DeltaFunction 和 threshold,和最后一個數據的 delta 超過 threshold 的會被丟棄
- TimeEvictor:找到所有數據的最大時間戳 max_ts,把時間戳小於 max_ts - interval 的數據刪除
內置 Evictors 默認都是在 window function 前執行
Flink 不保證 Window 內元素的順序,即先來到的數據不一定排在窗口的前面
Allowed Lateness & Side Output
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(Time.seconds(30))
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
講 Watermark 的時候也提到了,就是允許窗口計算結束后依然等待一段時間,如果還有屬於這個窗口的數據到來,還可以觸發新的計算,也可以將遲到數據放到另一個數據流處理
Checkpoint
Checkpoint 用於定期存儲 data source 被消費的位置,以及應用程序自己使用的 state(參考前面狀態部分)
實現 Checkpoint 的前提條件
- data source 的數據能保存一定時間,可以從指定的位置重復消費,比如消息隊列(Kafka、RabbitMQ、Amazon Kinesis、Google PubSub 等)或文件系統(HDFS、S3、GFS、NFS、Ceph 等)
- 永久存儲系統,通常都是分布式文件系統,比如 HDFS、S3、GFS、NFS、Ceph 等
Checkpoint 默認是關閉的,需要在代碼里設置打開
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
conf/flink-conf.yaml 文件里更多相應的設置可以參考
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/checkpointing.html#related-config-options
要在 Iterative Jobs(查看 Iterate Transformation 部分)打開 Checkpoint 需要多指定一個參數,並且有可能丟數據
env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)
Checkpoint 可以異步和增量的執行,這樣可以減少對數據處理的影響
Checkpoint 只要保證最新的就行,舊的可以被自動刪除
Savepoint 和 Checkpoint 一樣,區別在於 Savepoint 是認為觸發的,生成的 Savepoint 不會超時
比如通過 flink stop
Suspending job "c2cbdb0bca115d2375706b2c95d8a323" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints-directory/savepoint-c2cbdb-8d350f39371f
可以從指定的 Savepoint 恢復程序
flink run -s /tmp/flink-savepoints-directory/savepoint-c2cbdb-8d350f39371f flink-app.jar
更多信息參考 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/checkpointing.html
DataStream 的 Sources、Transformations、Sinks
StreamExecutionEnvironment 提供一些函數用於接入數據源
readTextFile(String filePath)
readTextFile(String filePath, String charsetName)
readFile(FileInputFormat<OUT> inputFormat, String filePath)
readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType,
long interval, TypeInformation<OUT> typeInformation)
readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType,
long interval, FilePathFilter filter)
socketTextStream(String hostname, int port)
socketTextStream(String hostname, int port, String delimiter, long maxRetry)
fromCollection(Collection<OUT> data)
fromCollection(Iterator<OUT> data, Class<OUT> type)
fromElements(OUT... data)
fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type)
generateSequence(long from, long to)
也可以通過 StreamExecutionEnvironment 的 addSource 函數添加 connectors(預定義或自定義的)
addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties)
addSource(new FlinkKinesisConsumer<>("kinesis_stream_name", new SimpleStringSchema(), consumerConfig))
addSource(new TwitterSource(props))
addSource(new RMQSource<String>(connectionConfig, "queueName", true, new SimpleStringSchema()))
addSource(new NiFiSource(clientConfig))
DataStream Transformations 的主要函數有
map
flatMap
filter
keyBy
reduce
fold // 用於將所有數據合成一個結果
sum
min
max
minBy
maxBy
keyBy(0).window
windowAll
windowedStream.apply
allWindowedStream.apply
union
dataStream.join(otherStream).where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
keyedStream.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2), Time.milliseconds(2))
.upperBoundExclusive(true).lowerBoundExclusive(true)
dataStream.coGroup(otherStream).where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
someStream.connect(otherStream)
split & select
stream.assignTimestamps // 用於從數據中解析出時間戳,這個時間戳可以和 window 配合
initialStream.iterate() // 后面細講
dataStream.partitionCustom
dataStream.shuffle
dataStream.rebalance
Iterate Transformation
DataStream<Long> someIntegers = env.generateSequence(0, 1000);
IterativeStream<Long> iteration = someIntegers.iterate(); // iterate 可以帶參數指定最大迭代次數
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value - 1 ;
}
});
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value > 0);
}
});
// 把 stillGreaterThanZero 里面的值扔回去,從 iteration.map 開始繼續計算
// 直到 stillGreaterThanZero 為空,或是達到最大迭代次數
iteration.closeWith(stillGreaterThanZero);
// 取出不再需要迭代的數據
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value <= 0);
}
});
env.execute()
數據的傳輸是有 buffer 的,可以設置 buffer 的超時時間,buffer 滿了或超時才會發出去
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
DataStream 提供一些方法用於將數據輸出
writeAsText(String path)
writeAsCsv(String path)
print()
writeUsingOutputFormat(OutputFormat<T> format)
writeToSocket(String hostName, int port, SerializationSchema<T> schema)
DataStream 也提供 addSink 函數添加 connectors(預定義或自定義的)
stream.addSink(new FlinkKafkaProducer011<String>("localhost:9092", "my-topic", new SimpleStringSchema()))
stream.addSink(new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig))
stream.addSink(new BucketingSink<String>("/base/path")) // 可用於 HDFS
stream.addSink(new RMQSink<String>(connectionConfig, "queueName", new SimpleStringSchema()))
streamExecEnv.addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...}));
CassandraSink.addSink(result)
.setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
.setHost("127.0.0.1")
.build();
stream.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
更多信息查看 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html
DataStream 的 Join
Window Join(同一個窗口的兩個 Stream 的數據兩兩傳到 join 函數,比如兩個窗口各 10 個數據,join 函數執行 100 次)
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
Interval Join
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
// 針對 orangeStream 的每個數據 a,都會尋找 greenStream 內時間戳和 a 的差在(-2,1)范圍內的數據
// 將這些數據分別和 a 組合傳給 join 函數
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/joining.html
Process Function
低階函數,可以對數據進行操作處理
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
DataStream<Tuple2<String, String>> stream = ...;
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
public class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
public class CountWithTimeoutFunction
extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(
Tuple2<String, String> value,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
current.count++;
current.lastModified = ctx.timestamp();
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
CountWithTimestamp result = state.value();
if (timestamp == result.lastModified + 60000) {
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
異步 IO
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// issue the asynchronous request, receive a future for result
final Future<String> result = client.query(key);
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
DataStream<String> stream = ...;
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html
DataSet 的 Sources、Transformation、Sinks
ExecutionEnvironment 提供一些函數用以獲取數據源
readTextFile(String filePath)
readTextFileWithValue(String filePath)
readCsvFile(String filePath)
readFileOfPrimitives(String filePath, Class<X> typeClass)
readFileOfPrimitives(String filePath, String delimiter, Class<X> typeClass)
fromCollection(Collection<X> data)
fromCollection(Iterator<X> data, Class<X> type)
fromElements(X... data)
fromParallelCollection(SplittableIterator<X> iterator, Class<X> type)
generateSequence(long from, long to)
readFile(FileInputFormat<X> inputFormat, String filePath)
// createInput 是一個通用函數,可以通過實現 InputFormat 類接入各種數據源
createInput(InputFormat<X, ?> inputFormat)
DataSet<Tuple2<String, Integer> dbData =
env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("select name, age from persons")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish()
);
DataSet 支持的 Transformation 主要有
map
flatMap
mapPartition
filter
reduce
reduceGroup
aggregate
distinct
join
leftOuterJoin
coGroup
cross
union
rebalance
partitionByHash
partitionByRange
partitionCustom
sortPartition
first
groupBy
project
DataSet 提供的 Sink 函數主要有
writeAsText(String filePath)
writeAsFormattedText(String filePath, TextFormatter<T> formatter)
writeAsCsv(String filePath)
print()
write(FileOutputFormat<T> outputFormat, String filePath)
// output 是通用函數,通過實現 outputFormat 可以將數據寫入不同的 Sink
output(OutputFormat<T> outputFormat)
myResult.output(
JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("insert into persons (name, age, height) values (?,?,?)")
.finish()
);
Iteration
// 初始數據為 0,最多迭代 10000 次
IterativeDataSet<Integer> initial = env.fromElements(0).iterate(10000);
DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer i) throws Exception {
double x = Math.random();
double y = Math.random();
return i + ((x * x + y * y < 1) ? 1 : 0);
}
});
// 當 iteration 為空或迭代次數達到最大時停止迭代,這里 iteration 永遠有數據,所以只能等最大迭代次數
DataSet<Integer> count = initial.closeWith(iteration);
// 跳出迭代后的操作
count.map(new MapFunction<Integer, Double>() {
@Override
public Double map(Integer count) throws Exception {
return count / (double) 10000 * 4;
}
}).print();
env.execute("Iterative Pi Example");
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/
Table API & SQL
Table API 和 SQL 把 DataStream 和 DataSet 當成表,可以用 SQL 執行,可以統一 stream 和 batch 的操作
Flink SQL 基於?Apache Calcite 實現
Table API 和 SQL 有兩個不同的 Planner:Old-Planner 和 Blink-Planner
兩者的一些區別參考:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#main-differences-between-the-two-planners
https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html
Table API 和 SQL 支持 Java、Scala、Python
使用 Python 需要先按照 PyFlink,要求 Python 版本是 3.5,3.6,3.7
python -m pip install apache-flink
Flink 和 Blink 創建 TableEnvironment 的方式
// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
Table API 和 SQL 的基本用法
// create a TableEnvironment for specific planner batch or streaming
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create a Table
tableEnv.connect(...).createTemporaryTable("table1");
// register an output Table
tableEnv.connect(...).createTemporaryTable("outputTable");
// create a Table object from a Table API query
Table tapiResult = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable");
// execute
tableEnv.execute("java_job");
創建表
// 為一個經過 SQL 生成的 Table 類創建表名
Table projTable = tableEnv.from("X").select(...);
tableEnv.createTemporaryView("projectedTable", projTable);
// 鏈接外部數據系統(數據庫,文件系統,消息隊列,等等)並創建表名
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
創建表的時候除了表名,還可以指定 catalog 和 database
Table table = ...;
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("exampleView", table);
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.exampleView", table);
// register the view named 'View' in the catalog named 'custom_catalog' in the
// database named 'custom_database'. 'View' is a reserved keyword and must be escaped.
tableEnv.createTemporaryView("`View`", table);
// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table);
// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
Table API 查詢表
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
Table orders = tableEnv.from("Orders");
// compute revenue for all customers from France
Table revenue = orders
.filter("cCountry === 'FRANCE'")
.groupBy("cID, cName")
.select("cID, cName, revenue.sum AS revSum");
// emit or convert Table
// execute query
SQL 查詢表
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// emit or convert Table
// execute query
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate(
"INSERT INTO RevenueFrance " +
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// execute query
寫入數據表
// create an output Table
final Schema schema = new Schema()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.LONG());
tableEnv.connect(new FileSystem("/path/to/file"))
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable");
// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable");
Table result = ...
// create TableSink
TableSink<Row> sink = ...
// register TableSink
tableEnv.registerTableSink(
"outputTable", // table name
new String[]{...}, // field names
new TypeInformation[]{...}, // field types
sink); // table sink
// emit result Table via a TableSink
result.insertInto("outputTable");
將 DataStream 和 DataSet 轉換為表
DataStream<Tuple2<Long, String>> stream = ...
// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream);
// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, "myLong, myString");
DataStream<Tuple2<Long, String>> stream = ...
// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
// convert the DataStream into a Table with swapped fields and field names "myString" and "myLong"
Table table3 = tableEnv.fromDataStream(stream, "f1 as myString, f0 as myLong");
// Person is a POJO with fields "name" and "age"
DataStream<Person> stream = ...
// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");
// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
DataStream<Row> stream = ...
// convert DataStream into Table with default field names "name", "age"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
Table table = tableEnv.fromDataStream(stream, "myName, myAge");
// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");
// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
將表轉換為 DataStream 和 DataSet
// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// convert the Table into an append DataStream of Tuple2<String, Integer>
// via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType);
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
// convert the Table into a DataSet of Row by specifying a class
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple = tableEnv.toDataSet(table, tupleType);
Connector:可以通過 connector 以 table 的方式讀寫外部系統(文件系統,數據庫,消息隊列,等等)
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
tableEnvironment
.connect(
new Kafka()
.version("0.10")
.topic("test-input")
.startFromEarliest()
// .startFromLatest()
// .startFromSpecificOffsets(...)
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
// .sinkPartitionerFixed()
// .sinkPartitionerRoundRobin()
)
.withFormat(
new Avro()
.avroSchema(
"{" +
" \"namespace\": \"org.myorganization\"," +
" \"type\": \"record\"," +
" \"name\": \"UserMessage\"," +
" \"fields\": [" +
" {\"name\": \"timestamp\", \"type\": \"string\"}," +
" {\"name\": \"user\", \"type\": \"long\"}," +
" {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" +
" ]" +
"}"
)
)
.withSchema(
new Schema()
.field("rowtime", DataTypes.TIMESTAMP(3))
.rowtime(new Rowtime() // optional: declares this field as a event-time attribute
// also can declares this field as a processing-time attribute via .proctime()
.timestampsFromField("timestamp")
.watermarksPeriodicBounded(60000) // also can be .watermarksPeriodicAscending()
// or .watermarksFromSource()
)
.field("user", DataTypes.BIGINT())
.field("message", DataTypes.STRING())
)
.createTemporaryTable("MyUserTable");
.connect(
new FileSystem()
.path("file:///path/to/whatever") // required: path to a file or directory
)
.withFormat( // required: file system connector requires to specify a format,
... // currently only OldCsv format is supported.
) // Please refer to old CSV format part of Table Formats section for more details.
.connect(
new Elasticsearch()
.version("6") // required: valid connector versions are "6"
.host("localhost", 9200, "http") // required: one or more Elasticsearch hosts to connect to
.index("MyUsers") // required: Elasticsearch index
.documentType("user") // required: Elasticsearch document type
.keyDelimiter("$") // optional: delimiter for composite keys ("_" by default)
// e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
.keyNullLiteral("n/a") // optional: representation for null fields in keys ("null" by default)
// optional: failure handling strategy in case a request to Elasticsearch fails (fail by default)
.failureHandlerFail() // optional: throws an exception if a request fails and causes a job failure
.failureHandlerIgnore() // or ignores failures and drops the request
.failureHandlerRetryRejected() // or re-adds requests that have failed due to queue capacity saturation
.failureHandlerCustom(...) // or custom failure handling with a ActionRequestFailureHandler subclass
// optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
.disableFlushOnCheckpoint() // optional: disables flushing on checkpoint (see notes below!)
.bulkFlushMaxActions(42) // optional: maximum number of actions to buffer for each bulk request
.bulkFlushMaxSize("42 mb") // optional: maximum size of buffered actions in bytes per bulk request
// (only MB granularity is supported)
.bulkFlushInterval(60000L) // optional: bulk flush interval (in milliseconds)
.bulkFlushBackoffConstant() // optional: use a constant backoff type
.bulkFlushBackoffExponential() // or use an exponential backoff type
.bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
.bulkFlushBackoffDelay(30000L) // optional: delay between each backoff attempt (in milliseconds)
// optional: connection properties to be used during REST communication to Elasticsearch
.connectionMaxRetryTimeout(3) // optional: maximum timeout (in milliseconds) between retries
.connectionPathPrefix("/v1") // optional: prefix string to be added to every REST communication
)
.withFormat( // required: Elasticsearch connector requires to specify a format,
... // currently only Json format is supported.
// Please refer to Table Formats section for more details.
)
.connect(
new HBase()
.version("1.4.3") // required: currently only support "1.4.3"
.tableName("hbase_table_name") // required: HBase table name
.zookeeperQuorum("localhost:2181") // required: HBase Zookeeper quorum configuration
.zookeeperNodeParent("/test") // optional: the root dir in Zookeeper for HBase cluster.
// The default value is "/hbase".
.writeBufferFlushMaxSize("10mb") // optional: writing option, determines how many size in memory of buffered
// rows to insert per round trip. This can help performance on writing to JDBC
// database. The default value is "2mb".
.writeBufferFlushMaxRows(1000) // optional: writing option, determines how many rows to insert per round trip.
// This can help performance on writing to JDBC database. No default value,
// i.e. the default flushing is not depends on the number of buffered rows.
.writeBufferFlushInterval("2s") // optional: writing option, sets a flush interval flushing buffered requesting
// if the interval passes, in milliseconds. Default value is "0s", which means
// no asynchronous flush thread will be scheduled.
)
// JDBC 似乎只支持 SQL 的方式導入,不支持 API 的方式導入
CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'jdbc', -- required: specify this table type is jdbc
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- required: JDBC DB url
'connector.table' = 'jdbc_table_name', -- required: jdbc table name
'connector.driver' = 'com.mysql.jdbc.Driver', -- optional: the class name of the JDBC driver to use to connect to this URL.
-- If not set, it will automatically be derived from the URL.
'connector.username' = 'name', -- optional: jdbc user name and password
'connector.password' = 'password',
-- scan options, optional, used when reading from table
-- These options must all be specified if any of them is specified. In addition, partition.num must be specified. They
-- describe how to partition the table when reading in parallel from multiple tasks. partition.column must be a numeric,
-- date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide
-- the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned.
-- This option applies only to reading.
'connector.read.partition.column' = 'column_name', -- optional, name of the column used for partitioning the input.
'connector.read.partition.num' = '50', -- optional, the number of partitions.
'connector.read.partition.lower-bound' = '500', -- optional, the smallest value of the first partition.
'connector.read.partition.upper-bound' = '1000', -- optional, the largest value of the last partition.
'connector.read.fetch-size' = '100', -- optional, Gives the reader a hint as to the number of rows that should be fetched
-- from the database when reading per round trip. If the value specified is zero, then
-- the hint is ignored. The default value is zero.
-- lookup options, optional, used in temporary join
'connector.lookup.cache.max-rows' = '5000', -- optional, max number of rows of lookup cache, over this value, the oldest rows will
-- be eliminated. "cache.max-rows" and "cache.ttl" options must all be specified if any
-- of them is specified. Cache is not enabled as default.
'connector.lookup.cache.ttl' = '10s', -- optional, the max time to live for each rows in lookup cache, over this time, the oldest rows
-- will be expired. "cache.max-rows" and "cache.ttl" options must all be specified if any of
-- them is specified. Cache is not enabled as default.
'connector.lookup.max-retries' = '3', -- optional, max retry times if lookup database failed
-- sink options, optional, used when writing into table
'connector.write.flush.max-rows' = '5000', -- optional, flush max size (includes all append, upsert and delete records),
-- over this number of records, will flush data. The default value is "5000".
'connector.write.flush.interval' = '2s', -- optional, flush interval mills, over this time, asynchronous threads will flush data.
-- The default value is "0s", which means no asynchronous flush thread will be scheduled.
'connector.write.max-retries' = '3' -- optional, max retry times if writing records to database failed
)
.connect(...)
.inAppendMode()
// inAppendMode : a dynamic table and an external connector only exchange INSERT messages.
// inUpsertMode : a dynamic table and an external connector exchange UPSERT and DELETE messages.
// inRetractMode : a dynamic table and an external connector exchange ADD and RETRACT messages.
Table Format
.withFormat(
new Csv()
.withFormat(
new Json()
.withFormat(
new Avro()
explain
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
.where("LIKE(word, 'F%')")
.unionAll(table2);
String explanation = tEnv.explain(table);
System.out.println(explanation);
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
DataStreamScan(id=[1], fields=[count, word])
DataStreamScan(id=[2], fields=[count, word])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Table & SQL 里處理 Event-Time 和 Process-Time
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html
Joins in Continuous Queries
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html
Data Type
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/types.html
Table API & SQL
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tableApi.html
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/
SQL:Detecting Patterns in Tables
# 將 MyTable 表按 userid 分類,再按 proctime 排序
# 如果連續三條數據的 name 分別是 a b c,就將這三條數據的 id 分別作為 aid,bid,cid 輸出
SELECT T.aid, T.bid, T.cid
FROM MyTable
MATCH_RECOGNIZE (
PARTITION BY userid
ORDER BY proctime
MEASURES
A.id AS aid,
B.id AS bid,
C.id AS cid
PATTERN (A B C)
DEFINE
A AS name = 'a',
B AS name = 'b',
C AS name = 'c'
) AS T
這個 Detecting Patterns 的功能看起來挺強的
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/match_recognize.html
SQL Function
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/functions/
Library CEP
Complex Event Processing (CEP) library implemented on top of Flink. It allows you to detect event patterns in an endless stream of events, giving you the opportunity to get hold of what’s important in your data.
// 假設有一條數據的 id 是 42
// 並且接下來相連的一條數據的 volume 大於等於 10
// 並且后面還有一條 name 是 end 的數據(不需要和第二條相連)
// 把滿足這個 Pattern 的數據找出來
DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
).next("middle").subtype(SubEvent.class).where(
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
);
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.process(
new PatternProcessFunction<Event, Alert>() {
@Override
public void processMatch(
Map<String, List<Event>> pattern,
Context ctx,
Collector<Alert> out) throws Exception {
out.collect(createAlertFrom(pattern));
}
});
指定數據出現的次數
Pattern<Event, ?> start = Pattern.<Event>begin("start").where(...)
// expecting 4 occurrences
start.times(4);
// expecting 0 or 4 occurrences
start.times(4).optional();
// expecting 2, 3 or 4 occurrences
start.times(2, 4);
// expecting 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).greedy();
// expecting 0, 2, 3 or 4 occurrences
start.times(2, 4).optional();
// expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).optional().greedy();
// expecting 1 or more occurrences
start.oneOrMore();
// expecting 1 or more occurrences and repeating as many as possible
start.oneOrMore().greedy();
// expecting 0 or more occurrences
start.oneOrMore().optional();
// expecting 0 or more occurrences and repeating as many as possible
start.oneOrMore().optional().greedy();
// expecting 2 or more occurrences
start.timesOrMore(2);
// expecting 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).greedy();
// expecting 0, 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).optional().greedy();
指定條件
// Iterative Conditions
.where(new IterativeCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
if (!value.getName().startsWith("foo")) {
return false;
}
double sum = value.getPrice();
for (Event event : ctx.getEventsForPattern("middle")) {
sum += event.getPrice();
}
return Double.compare(sum, 5.0) < 0;
}
});
// Simple Conditions
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return value.getName().startsWith("foo");
}
});
// Combining Conditions
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ... // some condition
}
}).or(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ... // or condition
}
});
// Stop condition
.oneOrMore().until(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // alternative condition
}
});
// Defines a subtype condition for the current pattern.
// An event can only match the pattern if it is of this subtype:
pattern.subtype(SubEvent.class);
組合
next() // 相連的下一條數據,比如(a,b,c),可以指定匹配(a,b),但不能指定匹配(a,c)
followedBy() // 在后面出現的數據,比如(a,b,c),可以指定匹配(a,b)或(a,c)都可以
followedByAny() // 在后面出現的所有滿足要求的數據,比如(a,b,c,c),指定匹配(a,c)時
// 會返回兩個滿足條件的(a,c)組合,而 followedBy 只會返回一個
notNext() // 相連的下一條不能出現的數據
notFollowedBy() // 在后面不能出現的數據
next.within(Time.seconds(10)); // 指定時間范圍
Groups of patterns
// strict contiguity
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);
// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();
// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
一個數據有可能被多次命中匹配,可以設置如何處理
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/cep.html#after-match-skip-strategy
更多 CEP 信息參考
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/cep.html
Library Gelly
Gelly is a Graph API for Flink
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/gelly/