原文地址: 大數據計算引擎之Flink Flink CEP復雜事件編程
復雜事件編程(CEP)是一種基於流處理的技術,將系統數據看作不同類型的事件,通過分析事件之間的關系,建立不同的時事件系序列庫,並利用過濾、關聯、聚合等技術,最終有簡單事件產生高級事件,並通過模式規則的方式對重要信息進行跟蹤和分析,從實時數據中心發掘有價值的信息。復雜事件處理主要應用於防范網絡欺詐、設備故障檢測、風險規避和智能營銷等領域。目前主流的CEP工具具有Esper,Jboss Drools和上夜班的MicroSoft StreamInsight等,Flink基於DataStream API提供了FlinkCEP組件棧,專門用於對復雜事件的處理,幫助用戶從流式數據中發掘有價值的信息。
基礎概念
FlinkCEP 說明
一個或多個由簡單事件構成的事件流通過一定的規則匹配,然后輸出用戶想得到的數據,滿足規則的復雜事件。具備如下的特征:
- 目標:從有序的簡單事件流中發現一些高階特征
- 輸入:一個或多個由簡單事件構成的事件流
- 處理:識別簡單事件之間的內在聯系,多個符合一定規則的簡單事件構成復雜事件
- 輸出:滿足規則的復雜事件

CEP用於分析低延遲、頻繁產生的不同來源的事件流。 CEP 可以幫助在復雜的、不相關的事件流中找出有意義的模式和復雜的關系,以接近實時或准實時的獲得通知並阻止一些行為。
CEP支持在流 上進行模式匹配,根據模式的條件不同,分為連續的條件或不連續的條件;模式的條件允許有時間的限制,當在條件范圍內沒有達到滿足的條件時,會導致模式匹配超時。
CEP用於分析低延遲、頻繁產生的不同來源的事件流。 CEP 可以幫助在復雜的、不相關的事件流中找出有意義的模式和復雜的關系,以接近實時或准實時的獲得通知並阻止一些行為。
環境准備
這里,我們需要引入相關的依賴包。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>1.9.0</version>
</dependency>
基本概念
事件定義
簡單事件
簡單事件存在於現實場景中,主要特點為處理單一事件,事件的定義可以直接觀察出來,處理過程中無需關注多個事件之間的關系,能夠通過簡單的數據處理手段將結果計算出來。
復雜事件
相對於簡單事件,復雜事件處理的不僅是單一的事件,也處理由多個事件組成的復合事件復雜事件處理監測分析事件流(Event Streaming),當特定事件發生時來觸發某些動作。
事件關系
復雜事件中事件間包含多種關系,常見的有時序關系、聚合關系、層次關系、依賴關系以及因果關系。
時序關系
動作事件與動作事件之間、動作事件與狀態變化事件之間,都存在時間順序。事件和事件的時序關系決定了大部分的時序規則,例如: A 事件狀態持續為 1 的同時 B 事件狀態為 0 等;
聚合關系
動作事件和動作事件之間,狀態事件和狀態事件之間都存在聚合關系,即個體聚合形成整體集合。例如: A事件狀態為 1 的次數為 10 觸發預警。
層次關系
動作事件和動作事件之間,狀態事件和狀態事件之間都存在層次關系,即父類事件和子類事件的層次關系,從父類到子類是具體化的,從子類到父類是泛化的。這個可以對比Java里面的繼承關系。
依賴關系
事物的狀態屬性之間彼此的依賴關系和約束關系,例如 A事件狀態觸發的條件前提是B事件觸發,則AB之間形成了依賴關系。
因果關系
對於完整的動作過程,結果狀態為果,初始化狀態和動作都可以視為因。
事件處理
復雜事件處理的目的是通過相應的負責對實時數據執行形影的處理策略,這些策略包括了推斷、查因、決策、預測等方面的應用。
事件推斷
主要利用事務狀態之間的約束關系,從一部分狀態屬性值可以推斷出另一部分的狀態屬性值。舉個栗子:1,1,2,3,5,8 ...... ,我們可以推斷出后面的是: 13,21 ......
事件查因
當出現結果狀態,並且知道初識狀態,可以查明某個動作的原因;同樣,知道結果,知道過程,就可以查明初始狀態的原因。這個相當於:f(x) = kx + b ,知道f(x) , 知道 kx+b , 那我們就知道 x .
事件決策
想得到某個結果狀態,知道初始狀態,決定執行什么動作。該過程和規則引擎相似,例如某個規則符合條件后出發行動,然后執行報警等操作。
事件預測
該種情況知道事件初始狀態,以及將要做的動作,預測未來發生的結果狀態。例如:天氣預報。
Pattern API
FlinkCEP 提供了 Pattern API 用於對輸入流數據的復雜事件規則定義,並從事件流中抽取事件結果。

每個Pattern 都應該包含幾個步驟,或者叫做 state 。從一個 state 到另一個 state . 例如:
Pattern.begin[LoginEvent]("begin")
.where(_.eventType.equals("fail"))
.next("next")
.where(_.eventType.equals("fail"))
.within(Time.seconds(5))
// 或者如下
Pattern.begin[Event]("start")
.where(_.typeEvent.equals("temperature"))
.next("middle")
.subtype(classOf[TempEvent])
.where(_.temp > 35.0)
.followedBy("end")
.where(_.name.equals("end"))
說明:
- 每一個state都應該有一個標識,比如:
begin[LoginEvent]("begin")這里的 "begin" 和begin[Event]("start")這里的 "start". - 每個state 都需要有一個唯一的名字,而且需要一個 filter 來過濾條 件,這個過濾條件定義事件需要符合的條件.例如:
.where(_.eventType.equals("fail")) - 我們也可以通過 subtype 來限制 Event的子類型,例如:
.subtype(classOf[TempEvent]) - 事實上,你可以多次調用subtype 和 where 方法;而且如果 where 條件是不相關的,你可以通過 or 來指定一個單獨的 filter 函數:
pattern.where(...).or(...); - 之后,我們可以在此條件基礎上,通過next 或者 follow edBy 方法切換到下一個state next 的意思是說上一步符合條件的元素之后緊挨着的元素;而 followedBy 並不要求一定是挨着的元素。這兩者分別稱為嚴格近鄰和非嚴格近鄰。
- 最后,我們可以將所有的Pattern 的條件限定在一定的時間范圍內:
within(Time.seconds(5)) - 時間可以是 Processing Time , 也可以是 Event Time.
Pattern檢測
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventSource, pattern)
patternStream.select(loginEventSource.keyBy(_.userID) , loginfailPattern)
一旦獲得PatternStream ,我們就可以通過 select 或 flatSelect ,從一個 Map 序列找到我們需要的警告信息。
select
select方法需要實現一個 PatternSelectFunction ,通過 select 方法來輸出需要的警告。它接受一個 Map 對,包含 string/event ,其中 key 為 state 的名字, event 則為真實的 Event。
val loginfailPattern = patternStream.select(
(pattern: Map[String, Iterable[LoginEvent]]) => {
val first = pattern.getOrElse("begin", null).iterator.next()
val second = pattern.getOrElse("next", null).iterator.next()
Warning(first.userID, first.eventTime, second.eventTime, "warning")
})
其返回值僅為1 條記錄。
flatSelect
通過實現PatternFlatSelectFunction ,實現與 select 相似的功能。唯一的區別就是 flatSelect 方法可以返回多條記錄, 它通過一個 Collector[OUT] 類型的參數來將要輸出的數據傳遞到下游。
超時事件的處理
通過 within 方法,我們的 parttern 規則 將匹配的事件 限定在一定的窗口范圍內。當有超過窗口時間 之 后 到達的 event ,我們可以通過在 select 或 flatSelect 中,實現PatternTimeoutFunction 和 PatternFlatTimeoutF unction 來處理這種情況。
val out: OutputTag[String] = OutputTag[String]("side-output")
patternStream.select(out){
(pattern:Map[String,Iterable[Event]],timestamp:Long)=>{
TimeoutEvent()
}{
(pattern:Map[String,Iterable[Event]],timestamp:Long)=>{
ComplexEvent()
}
}
}
大體的看完之后,我們FlinkCEP編程也基本就是醬紫。那現在就來詳細一點的說一下。
模式定義
個體Pattern可以是單次執行模式,也可以是循環執行模式。單次執行模式一次只接受一個事件,循環模式可以接受多個事件。通常情況下,可以指定循環次數將單次執行模式變為循環執行模式。每種模式能夠將多個條件組合應用到同一事件之上,條件組合可以通過 where 方法進行疊合。
個體 Pattern 都是通過 begin 方法定義的,例如以下通過 Pattern.begin 方法基於 Event 事件類型的 Pattern , 其中<start_pattern> 時指定的 PatternName 對象。
val start = Pattern.begin[Event]("start_pattern")
下一步通過 Pattern.where() 方法在 Pattern 指定 Condition , 只有當 Condition 滿足之后,當前的 Pattern 才會接收事件。
start.where(_.typeEvent.equals("temperature"))
指定循環次數
對於已經創建好的 Pattern , 可以指定循環次數,形成循環執行的 Pattern , 且有 3 種 方式來指定循環方式。
- times : 可以通過 times 指定固定的循環執行次數
// 指定2循環觸發 4 次
start.times(4)
// 可以指定循環次數范圍
start.times(2 , 4)
- optional : 也可以通過 optional 關鍵字指定要么不觸發,要么觸發指定次數
// 指定2循環觸發 4 次
start.times(4).optional()
// 可以指定循環次數范圍
start.times(2 , 4).optional()
- greedy: 可以通過 greedy 將 Pattern 標記為 貪婪模式,在 Pattern 匹配成功的前提下,會盡可能多的觸發
// 觸發 2,3,4 次,盡可能重復執行
start.times(2 , 4).optional()
// 觸發 0,2,3,4 次,盡可能重復執行
start.times(2 , 4).optional().greedy()
- oneOrMore: 可以通過 oneOrMore 方法指定觸發一次或多次
// 觸發一次或者多次
start.oneOrMore()
// 觸發一次或者多次,盡可能重復執行
start.oneOrMore().greedy()
// 觸發 0 次或者 多次
start.oneOrMore().optional()
// 觸發 0 次或者 多次 , 盡可能多次執行
start.oneOrMore().optional().greedy()
- timesOrMore: 通過 timesOrMore 方法可以指定觸發固定次數以上,例如執行兩次以上:
// 觸發兩次或者多次
start.timesOrMore(2)
// 觸發兩次或者多次,盡可能多次重復執行
start.timesOrMore(2).greedy()
模式條件
每個模式都需要指定觸發條件,作為時間進入到該模式是否接受的判斷依據,當時間中的數值滿足了條件,便進行下一步操作。在FlinkCEP中通過 patter.where()、pattern.or()、及patter.until()方法來為 Pattern 指定條件,且 Pattern 條件有 Iterative Conditions 、 Simple Conditions 及 Combining Conditions 三中類型。
迭代條件
Iterative Conditions 能夠對前面模式所有接收的事件進行處理,根據接收的事件集合統計出計算指標,並作為本次模式匹配中的條件輸入參數。如:
.oneOrMore
.subtype(classOf[TempEvent])
.where(
(value , ctx) => {
// the condition for you
}
)
通過 subtype 將 Event 事件轉換為 TempEvent 事件,然后在 where 條件中通過使用 ctx.getEventsForPattern(...) 方法獲取 “middle” 模式所有接收得到 Event 記錄,並基於這些 Event 數據之上對溫度求取平均值,然后判斷當前事件的溫度是否小於平均值,然后判斷當前事件的溫度是否小於平均值。
簡單條件
Simple Condition 繼承於 Iternative Condition 類,其主要根據事件中的字段信息進行判斷,決定是否接受該事件。如下:
start.where(event=>event.enevtType.equals("temperature"))
同樣,我們可以通過 subtype 對事件進行子類類型轉換,然后在 where 方法中針對子類定義模式條件。
組合條件
組合條件是將簡單條件進行合並,通常情況也可以使用 where 方法進行條件組合,默認每個條件通過 AND 邏輯相連。如果需要使用 OR 邏輯 , 如:
pattern.where(event => event.name.startWith("foo").or(event => enevt.eventType.equals("temperature")))
終止條件
如果程序中使用了 oneOrMore 或者 oneOrMore().optional() 方法,則必須指定終止條件,否則模式中的規則會一直循環下去,如:
patern.oneOrMore().until(event => event.name.equals("end"))
請注意:在上述的迭代條件通過調用 ctx.getEventsForPattern("middle")
模式序列
將互相獨立的模式進行組合然后形成模式序列。模式序列基本的編寫方式和獨立模式一致,各個模式之間通過鄰近條件進行連接即可。其中有嚴格鄰近,寬松臨近,非確定寬松臨近三種臨近連接條件,如下:
val start : Pattern[]
嚴格鄰近
嚴格鄰近條件中,需要所有的事件都按照滿足模式條件,不允許忽略任意不滿足的模式。如下:在start Pattern 后使用 next 方法指定 下一個 Pattern ,生成嚴格鄰近的 Pattern.
val strict : Pattern[Event,_] = start.next("middle").where(...)
寬松鄰近
在寬松鄰近條件下,會忽略沒有成功匹配模式條件,並不會像嚴格鄰近要求的那么高,可以簡單理解為 OR 的邏輯關系。如下:
val strict : Pattern[Event,_] = start.followeBy("middle").where(...)
非確定寬松鄰近
和寬松鄰近條件相比,非確定寬松鄰近條件指在模式匹配過程中可以忽略已經匹配的條件。如下:
```scala
val nonDetermin : Pattern[Event,_] = start.followerByAny("middle").where(....)
除了上述條件外, Flink 還提供了 notNext()、notFollowerBy()等鏈接條件 。notNext() 表示不想讓某一模式跟另一個模式之后不發生;notFollowerBy() 強調不想讓某一模式觸發處於兩個模式之間觸發。
注意點:模式序列不能以 notFollowerBy() 結尾,且 not 類型的模式不能和 optional 關鍵字同時使用
模式組
模式序列可以作為 begin , followerBy , floowerByAny 及 next 等連接條件的輸入參數從而形成的模式組。在GroupPattern 上可以指定 oneOrMore 、 times 、 optional 等循環條件,應用在 GroupPattern 中的模式序列上,每個模式序列完成自己內部的條件匹配,最后在米歐式組層面對模型序列結果進行匯總。如:
val value: GroupPattern[Event, _] = Pattern.begin(Pattern.begin[Event]("start")
.where(_.name.equals("name"))
.followedBy("start_middle")
.where(_.name.equals("yang")))
val value1: Pattern[Event, _] = Pattern.begin(Pattern.begin[Event]("start")
.next("next_start")
.where(_.name.equals("name"))
.followedBy("next_middle")
.where(_.name.equals("yang"))).times(3)
AfterMatchSkipStrategy
在給定的 Pattern 中,當同一事件符合多種模式條件組合之后,需要指定 AfterMatchSkipStrategy 策略以處理已經匹配的事件。在 AfterMatchSkipStrategy 配置中有四件事件處理策略,分別為 NO_SKIP / SKIP_PAST_LAST_EVENT / SKIP_TO_FIRST / SKIP_TO_LAST 。 每種策略的定義和使用方式如下:其中SKIP_TO_FIRST 和 SKIP_TO_LAST 在定義過程中需要指定有效的PatternName.
- [ ] NO_SKIP: 該策略表示將所有可能匹配的事件進行輸出,不忽略任何一條。
AfterMatchSkipStrategy.noSkip()
- [ ] SKIP_PAST_LAST_EVENT: 該策略表示忽略從模式條件開始觸發到當前觸發 Pattern 中的所有部分匹配事件。
AfterMatchSkipStrategy.skipPastLastEvent()
- [ ] SKIP_TO_FIRST: 該策略表示忽略第一個匹配指定 PatternName 的 Pattern 其之前的部分匹配事件。
AfterMatchSkipStrategy.skipToFirst(patternName)
- [ ] SKIP_TO_LAST 該策略表示忽略最后一個匹配指定 PatternName 的 Pattern 之前的部分匹配之間
AfterMatchSkipStrategy.skipToLast(patternName)
- [ ] SKIP_TO_NEXT: 該策略表示忽略指定 PatternName 的 Pattern 之后的部分匹配事件
AfterMatchSkipStrategy.skipToNext(patternName)
選擇完 AfterMatchSkipStrategy 之后,可以再創建 Pattern 時 , 通過 begin 方法中指定 skipStrategy , 然后就可以將 AfterMatchSkipStrategy 應用到當前的 Pattern 中。
val skipStrategy = { }
Pattern.begin("pattern_name" , skipStrategy)
事件獲取
對於前面已經定義的模式序列或模式組,需要和輸入數據流進行結合,才能發現事件中潛在的匹配關系。如:
val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)
FlinkCEP 提供了 CEP.pattern 方法將 DataStream 和 Pattern 應用在一起,得到 PatternStream 類型數據集,且后續時間數據獲取都基於PatternStream 進行。另外可以選擇創建 EventComparator , 對傳入的 Pattern 中的事件 進行排序,當 Event Time 相等或者同時 到達 Pattern 時 , EventComparator 鍾定一的排序策略可以幫助事件的先后順序。
當可以 CEP.pattern 方法被執行后,會生成 PatternStream 數據集,該數據集中包含了所有匹配事件。目前在FlinkCEP中提供了 select 和 flatSelect 兩種方法從 PatternStream 提取事件結果。
通過 Select Function 抽取正常事件
可以通過在 PatternStream 的 Select 方法中傳入自定義 Seclect Function 完成對匹配事件的轉換與輸出。其中 Select Function 的輸入參數為 Map[String,Iterable[IN]],Map 中的 Key 為模式序列中的 Pattern 名稱, Value 為對應 Pattern 所接受的事件集合,格式為輸入事件的數據類型。需要注意的是: Select Funtion將會在每次調用后僅輸出一條結果 如下:
def selectFunction (pattern:Map[String,Iterable[IN]]):OUT = {
// 獲取 pattern 中的 startEvent
val startEvent = pattern.get("start_pattern").get.next
// 獲取 pattern 中的 middleEvent
val middleEvent = pattern.get("middle_pattern").get.next
// 返回結果
OUT(startEvent , middleEvent)
}
通過 Select Function 抽取超時事件
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventSource, pattern)
// 創建 OutputTag ,並命名為 timeout-output
val timeoutTag: OutputTag[String] = OutputTag[String]("timeout-output")
// 調用 PatternStream Select() 並指定 timeoutTag
patternStream.select(timeoutTag) {
// 超時時間獲取
(pattern: Map[String, Iterable[LoginEvent]], timestamp: Long) => {
TimeOutEvent()
}
}{
(pattern: Map[String, Iterable[LoginEvent]], timestamp: Long) => {
NormalEvent()
}
// 調用 getSideOutput 方法,並指定 timeoutTag 將超時事件輸出
val timeoutResult : DataStream[TimeOutEvent] = result.getSideOutput(timeoutTag)
}
通過 Flat Select Function 抽取正常事件
Flat Seclect Function 和 Select Function 相似,不過 Flat Select Function 在每次調用可以返回任意數量的結果。因為 Flat Select Function 使用 Collector 作為返回結果的容器,可以將需要輸出的事件都放置在 Collector 中返回。如下:
def faltSelectFunction(pattern:Map[String,Iterable[IN]],collector:Collector[OUT])={
// 獲取 pattern 中的 startEvent
val startEvent = pattern.get("start_pattern").get.next
// 獲取 pattern 中的 middleEvent
val middleEvent = pattern.get("middle_pattern").get.next
// 根據 startEvent 返回結果
for (i <- 0 to startEvent.value){
collector.collect( OUT(startEvent , middleEvent))
}
}
通過 Flat Select Function 抽取超時事件
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventSource, pattern)
// 創建 OutputTag ,並命名為 timeout-output
val timeoutTag: OutputTag[String] = OutputTag[String]("timeout-output")
// 調用 PatternStream Select() 並指定 timeoutTag
patternStream.select(timeoutTag) {
// 超時時間獲取
(pattern: Map[String, Iterable[LoginEvent]], timestamp: Long , out:Collector[TimeoutEvent]) => {
out.collect(TimeOutEvent())
}
}{
(pattern: Map[String, Iterable[LoginEvent]], timestamp: Long) => {
out.collect(NormalEvent())
}
// 調用 getSideOutput 方法,並指定 timeoutTag 將超時事件輸出
val timeoutResult : DataStream[TimeOutEvent] = result.getSideOutput(timeoutTag)
}
