復雜事件處理引擎—Esper 處理模型


1、esper的處理模型是持續性的——根據statement中事件流(event stream)、視圖(views)、過濾器(filters)等的選擇,esper引擎一旦處理事件數據,就會變更statement中監聽或subscriber接收到事件信息。

2、insert Stream — 表示新事件進入到引擎,並進入到事件窗口等。

先看個例子 :

select * from Withdrawal

這個例子的IStream 其實就是進入引擎的withdrawal事件流,並作為新事件被推送給listener。如下圖所示:

說明:按照上面的EPL語句:select * from withdrawal; 每當有一個withdrawal事件進入時,如W1、W2等,一旦進入到引擎,就會當成新事件(New Events)立刻推送給updatelistener。

3、IRStream — Insert and Remove Stream  

當使用IRStream時,EPL中就會有事件窗口——長度窗口(length window)或者 時間窗口(time window)的使用。比如下面的EPL語句:

select * from Withdrawal.win:length(5)

這個EPL語句使用了長度窗口(length window)— win:length(N)。表示引擎會將過去的n條事件保存在事件流中。如下圖所示:

說明:上圖表示的是使用長度為5的事件窗口,事件的進入以及stream中事件信息的變化,通過引擎推送給監聽的事件信息。

win:length(5) 在Stream中最多保存5條數據,參考W5、W6事件的進入。當W5作為新事件進入到事件窗口時,此時窗口中的數據條數為5,達到了窗口的最大長度;W6事件進入時,則把W1從窗口中移除出去——遵循的是FIFO原則(先進先出),每一個進入的新事件都會在監聽中作為新事件輸出,只有窗口長度5的情況下,才會有舊事件的輸出。比如當W6進入時,監聽中的新事件為W6,而W1則作為舊事件被監聽獲取。

4、過濾器(filter)和where語句

首先,從一個EPL開始:

select * from Withdrawal(amount>=200).win:length(5)

在這個EPL中,有一個特殊的語法也就是 Withdrawal(amount>=200),通過Stream(表達式)的語法,即為filter。其實現的功能是對即將進入到事件窗口的事件進行過濾,滿足條件的事件,則被放入到窗口中。上面EPL表達的是 只有amount >= 200的withdrawal事件,才可以被放入到長度為5的事件窗口。換句話說,這個事件窗口中所有的事件,其amount屬性都不小於200。如下圖:

說明:每一個進入的事件,首先通過filter,當滿足fiter條件時,才會放入到事件窗口;而進入事件窗口的同時,引擎也會將該事件作為新事件推送給監聽或者subscriber。

where語句與filter有所不同,如EPL語句:

select * from Withdrawal.win:length(5) where amount >= 200

以及事件通過where過濾的處理模型如下:

說明:當有新事件進入時,會先進入到事件窗口;在引擎要將事件推送給監聽之前,判斷where條件,滿足where條件的事件,才會作為新事件傳送給監聽。

5、時間窗口(time window)

時間窗口,是一個滑動的事件窗口,其以系統時間為准,延伸到過去指定的時間間隔。比如win:time(10 seconds),這個時間窗口保存的事件是當前時間以及此前10秒這一時間間隔的所有事件。比如下面的EPL語句:

select * from Withdrawal.win:time(4 sec)

表示時間窗口中的事件是過去4秒鍾所有的withdrawal事件。如下圖所示:

說明:當第一個事件W1在t+4時刻進入到引擎時,其時間窗口從t到t+4這一時間段,只有一個事件W1,同時該事件作為新事件推送給監聽;當在t+5時刻,W2進入到引擎,此時事件窗口的時間范圍為 t+1 ~ t+5,窗口數據為 W1和W2,而此時W2也作為新事件輸出到監聽。時間窗口隨着系統時間的變化,其窗口表示的時間范圍也發送變化,當在t+8時,因為在t+4(其實是個臨界點)這個時刻進入的W1,因為已經不在該時間窗口,故W1作為舊事件被推送給監聽。

6、批量窗口(batch window)

批量窗口包括 時間批量窗口(win:time_batch)和長度批量窗口(win:length_batch)。

首先從時間批量開始,Time bath view緩存事件信息並且按照指定時間間隔在一次變更中釋放所有緩存的事件。EPL如下:

select * from Withdrawal.win:time_batch(4 sec)

上述時間批量窗口表示每隔4s形成一個事件窗口,老的窗口中的所有事件則作為新事件推送給監聽。如下圖:

說明:

· t+1時,W1事件發生並進入批量緩存,此時不會通知監聽。

· t+3時,W2事件發生並進入批量緩存,不通知監聽。

· t+4時,滿足了窗口間隔時間,此時緩存中有兩個事件W1W2,引擎處理,並通知監聽,此時輸出事件為 W1和 W2。此時創建一個新的bath buffer

· t+6 與 t+7之間有事件 W3進入bath buffer,監聽無動作。

· t+8時,引擎處理bath緩存中的事件,並傳遞給監聽。此時輸出事件為 W3Old Events 中包括了 W1W2.

長度批量窗口,基本上與時間批量窗口一樣,比如:

select * from withdrawal.win:length_batch(5)

上面的長度批量窗口,每當窗口事件總數達到5條時,則創建一個新的batch buffer,而老的事件窗口中5條事件作為新事件輸出到監聽。

【總結】filter和where的區別在於條件執行的時機——fiter是事件進入事件窗口之前就進行了過濾,不滿足條件的事件不會進入到窗口,更不會交付給引擎進行處理;而where則是從事件窗口中取出事件,通過引擎進行條件篩選,滿足條件的事件則作為新事件交付給監聽。從這個地方,可以看出,在過濾相同條件時,filter的效率會高於where,所以在能使用filter的時候,盡量不要使用where語句進行事件篩選。

事件窗口——時間窗口和長度窗口,這里時間窗口時一個滑動的窗口,隨着時間推移,窗口也在不斷移動;長度窗口更像是一個固定長度的queue,當達到窗口的總容量時,移除窗口中最先進入的事件(FIFO),並作為舊事件交付給監聽。

批量窗口,其實就是每個多久或者每個多少條事件做一次輸出,本次輸出的內容為新事件;當下一次輸出時,上一次輸出的新事件也就成了本次輸出的舊事件。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM