本文翻譯自官網:FlinkCEP - Complex event processing for Flink
FlinkCEP是在Flink之上實現的復雜事件處理(CEP)庫。 它使您可以檢測無窮無盡的事件流中的事件模式,從而有機會掌握數據中的重要信息。
本頁描述Flink CEP中可用的API調用。 我們首先介紹模式API,該API允許您指定要在流中檢測的模式,然后介紹如何檢測和處理匹配的事件序列。 然后,我們介紹CEP庫在處理事件時間的延遲時所做的假設,以及如何將作業從較舊的Flink版本遷移到Flink-1.3。
入門
如果您想直接使用,請設置一個Flink程序,並將FlinkCEP依賴項添加到您項目的pom.xml中。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.11</artifactId> <version>1.9.0</version> </dependency>
Info: FlinkCEP不是二進制分發包的一部分。 在此處查看如何與它鏈接以執行集群。
現在,您可以開始使用Pattern API編寫第一個CEP程序。
注意:您要向其應用模式匹配的DataStream中的事件必須實現適當的equals()和hashCode()方法,因為FlinkCEP使用它們來比較和匹配事件。
val input: DataStream[Event] = ... val pattern = Pattern.begin[Event]("start").where(_.getId == 42) .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0) .followedBy("end").where(_.getName == "end") val patternStream = CEP.pattern(input, pattern) val result: DataStream[Alert] = patternStream.process( new PatternProcessFunction[Event, Alert]() { override def processMatch( `match`: util.Map[String, util.List[Event]], ctx: PatternProcessFunction.Context, out: Collector[Alert]): Unit = { out.collect(createAlertFrom(pattern)) } })
模式API
模式API允許您定義要從輸入流中提取的復雜模式序列。
每個復雜模式序列都包含多個簡單模式,即尋找具有相同屬性的單個事件的模式。 從現在開始,我們將稱這些簡單模式為模式,以及我們在流中搜索的最終復雜模式序列,即模式序列。 您可以將模式序列視為此類模式的圖,其中根據用戶指定的條件(例如,從一個模式到下一個模式)的轉換。 event.getName()。equals(“ end”)。 匹配是一系列輸入事件,通過一系列有效的模式轉換來訪問復雜模式圖的所有模式。
注意:每個模式都必須具有唯一的名稱,稍后您將使用該名稱來標識匹配的事件。
注意:模式名稱不能包含字符":"。
在本節的其余部分,我們將首先描述如何定義單個模式,然后如何將單個模式組合為復雜模式。
單個模式
模式可以是單例或循環模式。 單例模式接受一個事件,而循環模式可以接受多個事件。 在模式匹配符號中,模式“ a b + c?d”(或“ a”,后跟一個或多個“ b”,可選地后跟“ c”,后跟“ d”),a,c ?和d是單例模式,而b +是循環的模式。 默認情況下,模式是單例模式,您可以使用“量詞”將其轉換為循環模式。 每個模式都可以具有一個或多個條件,基於該條件可以接受事件。
量詞
在FlinkCEP中,您可以使用以下方法指定循環模式:pattern.oneOrMore(),用於期望一個或多個給定事件發生的模式(例如,前面提到的b +); 和pattern.times(#ofTimes),用於期望在給定類型的事件中出現特定次數的模式,例如 4次;pattern.times(#fromTimes,#toTimes)用於期望在給定類型的事件中出現特定的最小發生次數和最大發生次數的模式,例如 2-4
您可以使用pattern.greedy()方法使循環模式變得貪婪(注:盡可能的多匹配),但仍無法使組模式變得貪婪。 您可以使用pattern.optional()方法將所有模式(是否循環)設為可選。
對於名為start的模式,以下是有效的量詞:
// expecting 4 occurrences start.times(4) // expecting 0 or 4 occurrences start.times(4).optional() // expecting 2, 3 or 4 occurrences start.times(2, 4) // expecting 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).greedy() // expecting 0, 2, 3 or 4 occurrences start.times(2, 4).optional() // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).optional().greedy() // expecting 1 or more occurrences start.oneOrMore() // expecting 1 or more occurrences and repeating as many as possible start.oneOrMore().greedy() // expecting 0 or more occurrences start.oneOrMore().optional() // expecting 0 or more occurrences and repeating as many as possible start.oneOrMore().optional().greedy() // expecting 2 or more occurrences start.timesOrMore(2) // expecting 2 or more occurrences and repeating as many as possible start.timesOrMore(2).greedy() // expecting 0, 2 or more occurrences start.timesOrMore(2).optional() // expecting 0, 2 or more occurrences and repeating as many as possible start.timesOrMore(2).optional().greedy()
條件
對於每種模式,您都可以指定一個傳入事件必須滿足的條件才能被“接受”到模式中,例如 其值應大於5,或大於先前接受的事件的平均值。 您可以通過pattern.where(),pattern.or()或pattern.until()方法在事件屬性上指定條件。 這些可以是IterativeConditions或SimpleConditions。
迭代條件:這是最通用的條件類型。 這樣,您可以根據先前接受的事件的屬性或部分事件的統計信息來指定接受后續事件的條件。
以下是一個迭代條件的代碼,如果名稱以“ foo”開頭,並且該模式先前接受的事件的價格加上當前價格不超過5.0 ,則接受下一個事件 為“中間”的模式的事件。 迭代條件可能很強大,尤其是與循環模式(例如循環模式)結合使用時 一個或多個()。
middle.oneOrMore() .subtype(classOf[SubEvent]) .where( (value, ctx) => { lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum value.getName.startsWith("foo") && sum + value.getPrice < 5.0 } )
注意:調用ctx.getEventsForPattern(...)會找到給定潛在匹配項的所有先前接受的事件。 該操作的成本可能會有所不同,因此在實施時,請盡量減少其使用。
所描述的上下文也使事件時間特性具有一種訪問方式。 有關更多信息,請參見時間上下文。
簡單條件:這種類型的條件擴展了前面提到的IterativeCondition類,並僅基於事件本身的屬性來決定是否接受事件。
start.where(event => event.getName.startsWith("foo"))
最后,您還可以通過pattern.subtype(subClass)方法將接受事件的類型限制為初始事件類型的子類型(此處為Event)。
start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)
組合條件:如上所示,您可以將子類型條件與其他條件組合。 這適用於所有條件。 您可以通過順序調用where()任意組合條件。 最終結果將是各個條件結果的邏輯與。
要使用OR合並條件,可以使用or()方法,如下所示。
pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)
停止條件:如果是循環模式(oneOrMore()和oneOrMore()。optional()),您還可以指定停止條件,例如 接受值大於5的事件,直到值的總和小於50。
為了更好地理解它,請看以下示例。
-
"(a+ until b)"(一個或多個"a"直到"b")的模式 -
一系列傳入事件
"a1" "c" "a2" "b" "a3" -
該庫將輸出結果:
{a1 a2} {a1} {a2} {a3}。
如您所見{a1 a2 a3},{a2 a3}由於停止條件而未返回。
| Pattern Operation | Description |
|---|---|
| where(condition) | 定義當前模式的條件。 為了匹配模式,事件必須滿足條件。 多個連續的where()子句導致其條件與: |
| or(condition) | 添加與現有條件進行“或”運算的新條件。 一個事件只有通過至少一個條件才能匹配模式: |
| until(condition) | 指定循環模式的停止條件。 意味着如果發生符合給定條件的事件,則模式中將不再接受任何事件. 僅與oneOrMore()結合使用 注意:它允許在基於事件的情況下為相應的模式清除狀態. |
| subtype(subClass) | 定義當前模式的子類型條件。 如果事件屬於此子類型,則該事件只能與該模式匹配: |
| oneOrMore() | 指定此模式期望至少發生一次匹配事件. 默認情況下,使用寬松的內部連續性(在后續事件之間)。 有關內部連續性的更多信息,請參見連續. 注意:建議使用until()或within()來啟用清除狀態 |
| timesOrMore(#times) | 指定此模式期望至少出現 #times 次匹配事件. 默認情況下,使用寬松的內部連續性(在后續事件之間)。 有關內部連續性的更多信息,請參見連續. |
| times(#ofTimes) | 指定此模式期望發生匹配事件的確切次數. 默認情況下,使用寬松的內部連續性(在后續事件之間)。 有關內部連續性的更多信息,請參見連續. |
| times(#fromTimes, #toTimes) | 指定此模式期望匹配事件的#fromTimes和#toTimes之間發生. 默認情況下,使用寬松的內部連續性(在后續事件之間)。 有關內部連續性的更多信息,請參見連續. |
| optional() | 指定此模式是可選的,即它可能根本不會發生。 這適用於所有上述量詞. |
| greedy() | 指定此模式為貪婪模式,即將重復盡可能多的匹配。 這僅適用於量詞,目前不支持分組模式. |
組合模式
既然您已經了解了單個模式的描述,那么現在該看看如何將它們組合成完整的模式序列了。
模式序列必須從初始模式開始,如下所示:
val start : Pattern[Event, _] = Pattern.begin("start")
接下來,可以通過在模式序列之間指定所需的連續性條件,將更多模式附加到模式序列中。FlinkCEP支持以下事件之間的連續性形式:
-
嚴格連續性:期望所有匹配事件嚴格地一個接一個地出現,而中間沒有任何不匹配事件。
-
寬松連續性:忽略在匹配事件之間出現的不匹配事件。
-
非確定性寬松連續性:進一步的寬松連續性,允許其他匹配忽略某些匹配事件。
要在連續模式之間應用它們,可以使用:
next()為嚴格的followedBy()為寬松的followedByAny(),用於不確定的寬松鄰接
或
notNext(),如果您不希望某個事件類型直接跟隨另一個事件notFollowedBy(),如果您不希望事件類型介於其他兩個事件類型之間。
注意:模式序列不能以 notFollowedBy() 結尾。
注意:NOT模式不能在可選模式之前。
// strict contiguity val strict: Pattern[Event, _] = start.next("middle").where(...) // relaxed contiguity val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...) // non-deterministic relaxed contiguity val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...) // NOT pattern with strict contiguity val strictNot: Pattern[Event, _] = start.notNext("not").where(...) // NOT pattern with relaxed contiguity val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)
寬松連續性意味着僅將匹配第一個后續匹配事件,而對於不確定的寬松連續性,將針對同一開始發出多個匹配項。例如,"a b"給定事件序列的pattern "a", "c", "b1", "b2"將給出以下結果:
-
"a"和"b"之間的嚴格鄰接:{}(不匹配),“ a”之后的“ c”會導致“ a”被丟棄。 -
"a"和"b"之間寬松連續性:{a b1}作為輕松的連續性被視為“跳過不匹配的事件,直到下一個匹配的一個”。 -
“ a”和“ b”之間的不確定確定的寬松連續性:{a b1},{a b2},因為這是最通用的形式。
也可以定義時間約束以使模式有效。例如,您可以通過pattern.within()方法定義一個模式應該在10秒內發生。處理和事件時間都支持時間模式。
注意:模式序列只能具有一個時間約束。如果在不同的單獨模式上定義了多個這樣的約束,那么將應用最小的約束。
next.within(Time.seconds(10))
循環模式內的連續性
您可以在循環模式中應用與上一節中討論的相同的鄰接條件。 連續性將應用於接受到這種模式的元素之間。 為了舉例說明上述內容,請使用輸入“ a”的模式序列“ a b + c”(“ a”,然后是一個或多個“ b”的任意(非確定性寬松)序列,后跟“ c”) “,” b1“,” d1“,” b2“,” d2“,” b3“,” c“將具有以下結果:
-
嚴格連續性:{a b3 c} –在“ b1”之后的“ d1”導致“ b1”被丟棄,由於“ d2”,對於“ b2”也一樣。
-
寬松的連續性:{a b1 c},{a b1 b2 c},{a b1 b2 b3 c},{a b2 c},{a b2 b3 c},{a b3 c}-忽略“ d”。
-
非確定性寬松的連續性:{a b1 c},{a b1 b2 c},{a b1 b3 c},{a b1 b2 b3 c},{a b2 c},{a b2 b3 c},{a b3 c}-請注意{ a b1 b3 c},這是“ b”之間的松弛連續性的結果。
對於循環模式(例如oneOrMore()和times()),默認設置為寬松連續性。如果要嚴格鄰接,則必須使用consecutive()調用顯式指定它,如果要 非確定性寬松鄰接,則可以使用該allowCombinations()調用。
模式組
也可以將模式序列定義為begin,followedBy,followedByAny和next的條件。 模式序列在邏輯上將被視為匹配條件,並且將返回GroupPattern,並且可以應用oneOrMore(),times(#ofTimes),times(#fromTimes,#toTimes),optional(),continuous(), allowCombinations()到GroupPattern。
val start: Pattern[Event, _] = Pattern.begin( Pattern.begin[Event]("start").where(...).followedBy("start_middle").where(...) ) // strict contiguity val strict: Pattern[Event, _] = start.next( Pattern.begin[Event]("next_start").where(...).followedBy("next_middle").where(...) ).times(3) // relaxed contiguity val relaxed: Pattern[Event, _] = start.followedBy( Pattern.begin[Event]("followedby_start").where(...).followedBy("followedby_middle").where(...) ).oneOrMore() // non-deterministic relaxed contiguity val nonDetermin: Pattern[Event, _] = start.followedByAny( Pattern.begin[Event]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...) ).optional()
| Pattern Operation | Description |
|---|---|
| begin(#name) | 定義開始模式: |
| begin(#pattern_sequence) | 定義開始模式: |
| next(#name) | 追加一個新模式。 匹配事件必須直接接在先前的匹配事件之后(嚴格連續): |
| next(#pattern_sequence) | 追加一個新模式。 一系列匹配事件必須直接接續先前的匹配事件(嚴格連續): |
| followedBy(#name) | 追加一個新模式。 匹配事件和上一個匹配事件之間可能會發生其他事件(寬松的連續性): |
| followedBy(#pattern_sequence) | 追加一個新模式。 在一系列匹配事件和上一個匹配事件之間可能會發生其他事件(寬松的連續性): |
| followedByAny(#name) | 追加一個新模式。 在匹配事件和先前的匹配事件之間可能會發生其他事件, 並且將為每個替代匹配事件顯示替代匹配(不確定性寬松鄰接): |
| followedByAny(#pattern_sequence) | 追加一個新模式。 在匹配事件序列和先前的匹配事件之間可能會發生其他事件, 並且將為匹配事件的每個替代序列(非確定性寬松連續性)提供替代匹配: |
| notNext() | 附加新的否定模式。 匹配(否定)事件必須直接繼承先前的匹配事件(嚴格連續性)才能放棄部分匹配: |
| notFollowedBy() | 附加新的否定模式。 即使在匹配(負)事件和上一個匹配事件(寬松的鄰接)之間發生其他事件,部分匹配事件序列也將被丟棄: |
| within(time) | 定義事件序列與模式匹配的最大時間間隔。 如果未完成的事件序列超過此時間,則將其丟棄: |
匹配后跳過策略
對於給定的模式,可以將同一事件分配給多個成功的匹配。 要控制將事件分配給多少個匹配項,您需要指定一個名為AfterMatchSkipStrategy的跳過策略。 跳過策略有五種類型,列出如下:
- NO_SKIP:將發出所有可能的匹配項。
- SKIP_TO_NEXT:丟棄以同一事件開始的所有部分匹配,發出的匹配從事件開始。
- SKIP_PAST_LAST_EVENT:丟棄匹配開始后但結束之前,開始的所有匹配。
- SKIP_TO_FIRST:丟棄在匹配開始之后但在PatternName的第一個事件發生之前開始的所有匹配。
- SKIP_TO_LAST:丟棄在匹配開始之后但在PatternName的最后一個事件發生之前開始的所有匹配。
請注意,在使用SKIP_TO_FIRST和SKIP_TO_LAST跳過策略時,還應指定有效的PatternName。
例如,對於給定的模式b + c和數據流b1 b2 b3 c,這四種跳過策略之間的差異如下:
| Skip Strategy | Result | Description |
|---|---|---|
| NO_SKIP | b1 b2 b3 cb2 b3 cb3 c |
找到匹配的b1 b2 b3 c后,匹配過程將不會丟棄任何結果。 |
| SKIP_TO_NEXT | b1 b2 b3 cb2 b3 cb3 c |
找到匹配的b1 b2 b3 c之后,匹配過程將不會丟棄任何結果, 因為沒有其他匹配可以從b1開始. |
| SKIP_PAST_LAST_EVENT | b1 b2 b3 c |
找到匹配的b1 b2 b3 c后,匹配過程將丟棄所有開始的部分匹配. |
SKIP_TO_FIRST[b] |
b1 b2 b3 cb2 b3 cb3 c |
找到匹配的b1 b2 b3 c之后,匹配過程將嘗試丟棄b1之前開始的所有部分匹配, 但沒有此類匹配。 因此,什么都不會被丟棄. |
SKIP_TO_LAST[b] |
b1 b2 b3 cb3 c |
找到匹配的b1 b2 b3 c之后,匹配過程將嘗試丟棄b3之前開始的所有部分匹配。 有這樣的一次匹配 b2 b3 c |
再看看另一個示例,以更好地了解NO_SKIP和SKIP_TO_FIRST之間的區別:模式:(a | b | c)(b | c)c + .greedy d和序列:a b c1 c2 c3 d然后結果將是:
| Skip Strategy | Result | Description |
|---|---|---|
| NO_SKIP | a b c1 c2 c3 db c1 c2 c3 dc1 c2 c3 d |
找到匹配a b c1 c2 c3 d后,匹配過程將不會丟棄任何結果. |
SKIP_TO_FIRST[c*] |
a b c1 c2 c3 dc1 c2 c3 d |
在找到與b c1 c2 c3 d匹配之后,匹配過程將丟棄在c1之前開始的所有部分匹配。 有一個這樣的匹配b c1 c2 c3 d. |
為了更好地理解NO_SKIP和SKIP_TO_NEXT之間的區別,請看以下示例:模式:a b +和序列:a b1 b2 b3然后結果將是:
| Skip Strategy | Result | Description |
|---|---|---|
| NO_SKIP | a b1a b1 b2a b1 b2 b3 |
找到匹配的b1之后,匹配過程將不會丟棄任何結果. |
| SKIP_TO_NEXT | a b1 |
找到匹配的b1之后,匹配過程將丟棄所有從a開始的部分匹配。 這意味着既不能生成b1 b2也不能生成b1 b2 b3. |
要指定要使用的跳過策略,只需通過調用以下內容來創建AfterMatchSkipStrategy:
| Function | Description |
|---|---|
AfterMatchSkipStrategy.noSkip() |
Create a NO_SKIP skip strategy |
AfterMatchSkipStrategy.skipToNext() |
Create a SKIP_TO_NEXT skip strategy |
AfterMatchSkipStrategy.skipPastLastEvent() |
Create a SKIP_PAST_LAST_EVENT skip strategy |
AfterMatchSkipStrategy.skipToFirst(patternName) |
使用引用的模式名稱patternName創建一個SKIP_TO_FIRST跳過策略 |
AfterMatchSkipStrategy.skipToLast(patternName) |
使用引用的模式名稱patternName創建一個SKIP_TO_LAST跳過策略 |
然后通過調用將跳過策略應用於模式:
val skipStrategy = ...
Pattern.begin("patternName", skipStrategy)
注意:對於SKIP_TO_FIRST / LAST,有兩個選項可以處理在沒有元素映射到指定變量時的情況。 默認情況下,將使用NO_SKIP策略。 另一個選擇是在這種情況下引發異常。 可以通過以下方式啟用此選項:
AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
檢測模式
在指定了所需的模式序列之后,是時候將其應用於輸入流以檢測潛在的匹配了。 要針對模式序列運行事件流,必須創建一個PatternStream。 給定一個輸入流輸入,一個模式模式和一個可選的比較器比較器(用於在EventTime的情況下對具有相同時間戳的事件或在同一時刻到達的事件進行排序),您可以通過調用以下代碼來創建PatternStream:
val input : DataStream[Event] = ... val pattern : Pattern[Event, _] = ... var comparator : EventComparator[Event] = ... // optional val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)
輸入流可以是鍵的,也可以是非鍵的,具體取決於您的用例。
注意:將模式應用於非鍵控流將導致並行度等於1的作業。
從模式中選擇
獲得PatternStream之后,可以將轉換應用於檢測到的事件序列。 建議的實現方式是通過PatternProcessFunction。
PatternProcessFunction具有一個processMatch方法,每個匹配事件序列都會調用該方法。 它以Map <String,List <IN >>的形式接收匹配,其中鍵是模式序列中每個模式的名稱,值是該模式所有可接受事件的列表(IN是您的類型 輸入元素)。 給定模式的事件按時間戳排序。 返回每個模式的接受事件列表的原因是,當使用循環模式(例如oneToMany()和times())時,給定模式可能會接受多個事件。
class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> { @Override public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception; IN startEvent = match.get("start").get(0); IN endEvent = match.get("end").get(0); out.collect(OUT(startEvent, endEvent)); } }
PatternProcessFunction允許訪問Context對象。 有了它,就可以訪問與時間相關的特征,例如currentProcessingTime或當前匹配的時間戳(這是分配給匹配的最后一個元素的時間戳)。 有關更多信息,請參見時間上下文。 通過這種情況,還可以將結果發送到副輸出。
處理超時的部分模式
只要某個模式具有通過inner關鍵字附加的窗口長度,則可能會丟棄部分事件序列,因為它們超過了窗口長度。 要對超時的部分匹配采取行動,可以使用TimedOutPartialMatchHandler接口。 該接口應該以混合樣式使用。 這意味着您還可以使用PatternProcessFunction實現此接口。 TimedOutPartialMatchHandler提供了額外的processTimedOutMatch方法,將為每個超時的部分匹配調用該方法。
class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> { @Override public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception; ... } @Override public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception; IN startEvent = match.get("start").get(0); ctx.output(outputTag, T(startEvent)); } }
注意:processTimedOutMatch不能訪問主輸出。 但是,仍然可以通過Context對象通過側面輸出發出結果。
Convenience API
前面提到的PatternProcessFunction是在Flink 1.8中引入的,從那時起,它是與匹配項進行交互的推薦方法。 仍然可以使用老式的API,例如select / flatSelect,該API在內部將轉換為PatternProcessFunction。
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) val outputTag = OutputTag[String]("side-output") val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){ (pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) => out.collect(TimeoutEvent()) } { (pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) => out.collect(ComplexEvent()) } val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(outputTag)
CEP庫中的時間
處理事件時間的延遲
在CEP中,處理元素的順序很重要。 為了確保在事件時間內工作時按正確的順序處理元素,將傳入的元素最初放在緩沖區中,在該緩沖區中,元素根據其時間戳按升序排序,並且當水印到達時,該緩沖區中的所有元素 小於水印的時間戳被處理。 這意味着水印之間的元素按事件時間順序進行處理。
注意:在事件時間內工作時,cep 庫假定水印正確無誤。
為了確保跨水印的元素按事件時間順序進行處理,Flink的CEP庫假定水印是正確的,並將其時間戳小於最后看到的水印的時間戳視為晚元素。 后期元素不會進一步處理。 另外,您可以指定sideOutput標簽來收集在最后看到的水印之后出現的后期元素,您可以像這樣使用它。
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) val lateDataOutputTag = OutputTag[String]("late-data") val result: SingleOutputStreamOperator[ComplexEvent] = patternStream .sideOutputLateData(lateDataOutputTag) .select{ pattern: Map[String, Iterable[ComplexEvent]] => ComplexEvent() } val lateData: DataStream[String] = result.getSideOutput(lateDataOutputTag)
Time context
在PatternProcessFunction和IterativeCondition中,用戶可以訪問實現TimeContext的上下文,如下所示:
/** * Enables access to time related characteristics such as current processing time or timestamp of * currently processed element. Used in {@link PatternProcessFunction} and * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition} */ @PublicEvolving public interface TimeContext { /** * Timestamp of the element currently being processed. * * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this * will be set to the time when event entered the cep operator. */ long timestamp(); /** Returns the current processing time. */ long currentProcessingTime(); }
此上下文使用戶可以訪問已處理事件的時間特征(在IterativeCondition情況下為傳入記錄,在PatternProcessFunction情況下為匹配)。 調用TimeContext#currentProcessingTime始終會為您提供當前處理時間的值,並且此調用應優先於例如 調用System.currentTimeMillis()。
如果使用TimeContext#timestamp(),則返回值等於使用EventTime時分配的時間戳。 在ProcessingTime中,該時間等於所述事件進入cep運算符的時間點(或在PatternProcessFunction的情況下生成匹配項的時間點)。 這意味着該值在對該方法的多次調用中將保持一致。
Examples
以下示例在事件的鍵控數據流上檢測模式的開始,中間(名稱=“錯誤”)->結束(名稱=“嚴重”)。 這些事件由其ID進行鍵控,並且有效模式必須在10秒內發生。 整個處理過程隨事件時間而定。
val env : StreamExecutionEnvironment = ... env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val input : DataStream[Event] = ... val partitionedInput = input.keyBy(event => event.getId) val pattern = Pattern.begin[Event]("start") .next("middle").where(_.getName == "error") .followedBy("end").where(_.getName == "critical") .within(Time.seconds(10)) val patternStream = CEP.pattern(partitionedInput, pattern) val alerts = patternStream.select(createAlert(_))
從舊版本的Flink遷移(1.3版之前)
遷移到1.4+
在Flink-1.4中,刪除了CEP庫與<= Flink 1.2的向后兼容性。 不幸的是,不可能恢復曾經使用1.2.x運行的CEP作業。
遷移到1.3.x
Flink-1.3中的CEP庫附帶了許多新功能,這些新功能導致API發生了一些更改。 在這里,我們描述了您需要對舊的CEP作業進行的更改,以便能夠使用Flink-1.3運行它們。 進行這些更改並重新編譯作業后,您將能夠從作業的舊版本獲取的保存點恢復執行,即無需重新處理過去的數據。
所需的更改是:
-
更改條件(
where(...)子句中的條件)以擴展SimpleCondition類,而不是實現FilterFunction接口。 -
更改作為select(...)和flatSelect(...)方法的參數提供的函數,以獲取與每個模式關聯的事件列表(Java中為List,Scala中為Iterable)。 這是因為通過添加循環模式,多個輸入事件可以匹配單個(循環)模式。
-
Flink 1.1和1.2中的followedBy()隱含了不確定的寬松連續性(請參見此處)。 在Flink 1.3中,此更改已更改,並且followBy()表示寬松鄰接,而如果需要非確定性寬松鄰接,則應使用followByAny()。
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

