深入淺出事件流處理NEsper(三)


首先介紹一下NESPER的大體結構,NEsper從內容上分為兩塊,NEsper的核心NEsper.dll和NEsper.IO.dll。

(1)NEsper的核心包包含了EPL語法解析引擎,事件監聽機制,事件處理等核心模塊。

(2)NEsper的io包含從各種數據源讀取數據以及將輸出結果寫入各種數據源,包括excel,database,msmq,http,socket,XML。

貼一張esper官網上的結構圖,方便大家了解esper的結構

nesper

 

接下來對上述結構圖進行詳細的解釋讓大家加深對ESPER的了解

1. Event對象:NESPER處理的事件的最小單位,一個任意的CLR對象,屬性支持簡單的CLR類型、數組、map、以及嵌套POCO,很靈活.

2.EPL:EPL是ESPER的核心,它類似於SQL,但是和SQL的執行方式不同。

SQL是數據在那里,你每次執行SQL就會觸發一次查詢;而EPL是查詢在這里,數據輸入達到一定條件即可觸發查詢。

這個條件可以有多種:

a).每個event對象來就觸發一次查詢,並只處理當前對象

select * from OrderEvent

這個EPL語句會在每個OrderEvent對象到達后,並將該event交給后續的Listener(后面會講到)來進行處理。但是這種用法不多見,意義不大。

b).窗口處理模式:

EPL最大的特色就是這個窗口處理模式,有兩種窗口,時間窗口和長度窗口。

時間窗口 : 大家想一下,如果有一個場景,要獲取最近3秒內OrderEvent的price的平均值,那該怎么做呢?一般的做法需要做個后台線程來做3秒的時間統計,時間到了再做后續處理,雖然不復雜,但是也挺繁瑣的。

看看EPL是怎么做的

select avg(price) from test.OrderEvent.win:time(3 sec)

win:time(3 sec)就是定義了3秒的時間窗口,avg(price)就是統計了3秒內的OrderEvent對象的price的平均值

長度窗口: 長度窗口和時間窗口比較類似

select avg(price) from test.OrderEvent.win:length(100)

win:length(10)就是定義了10個Event的,avg(price)就是統計了最近10個的OrderEvent對象的price的平均值

以上這些都比較容易理解,雖然知道了處理方法,也比較好用,我還是比較喜歡鑽研一下他的內部實現方式。先來看一張時間窗口模式的圖

image

他僅保留最近時間窗口的對象內容,但是每個Event到來都會觸發一次UpdateListener的操作

EPL語句會作為一個Statement來監聽事件的到來,當New Events有新事件時就會觸發UpdateListener的操作,下面是一個updateListener的簡單例子,event.get("avg(price))就可以獲得EPL查詢所獲得的price平均值,然后就可以加入自己的代碼進行處理,比如將結果寫入本地文件

而New Events和Old Events就是他的輸入,而ave(price)操作所計算的對象就是Length Window中的內容。

public class MyListener : UpdateListener {
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
EventBean event = newEvents[0];
Console.WriteLine("avg=" + event.Get("avg(price)"));
}
}

事件窗口也基本類似。

c)批量窗口處理模式

窗口模式是會在每個Event來都觸發一次UpdateListener操作,如果每秒Event數量達到很大的話這種方式明顯是不行的 CPU消耗會很厲害

批量窗口處理模式正好可以解決這個問題

批量時間窗口模式

select avg(price) from test.OrderEvent.win:time_batch(3 sec)

批量長度窗口模式

select avg(price) from test.OrderEvent.win:length_batch(10)

時間批量模式的操作圖如下

nesperbatchtime

上圖的時間窗口大小為4s,他會在4s的窗口時間到達以后才將窗口中的內容一起扔給UpdateListener來進行處理,性能相對節約很多,特別是大數據量的情況下。長度批量窗口的處理模式也是類似。

上述窗口模式下內存使用情況又是如何呢?經過本人測試和研究代碼發現,它會保留兩個窗口的內存使用量,一個保存當前窗口的Events,一個保存上一個窗口的Events,因此在估算一個數據分析程序占用多少內存要看上面監聽的EPL語句開的窗口的大小以及數據的TPS,防止內存OOM。

掌握了上面的窗口的概念,后面其他的內容都很好理解了

d) 過濾

where過濾

select avg(price) from test.OrderEvent.win:time_batch(3 sec) where price>10

having過濾

select avg(price) from test.OrderEvent.win:time_batch(3 sec) having price>10

似曾相識啊,執行方式也基本和SQL里的where 和 having差不多。

在EPL里where 是在incoming Events到window之間進行過濾,having是在window到New Eventing之間進行過濾

e)聚合

count

select count(price) from test.OrderEvent.win:time_batch(3 sec) where price>10

sum

select sum(price) from test.OrderEvent.win:time_batch(3 sec) where price>10

group by

select itemName,sum(price) from test.OrderEvent.win:time_batch(3 sec) where price>10 group by itemName

都很簡單,了解SQL的都狠容易上手

f) 函數

ESPER默認加載

• System
• System.Collections
• System.Text

支持這些包下的函數方法,例如

select Math.round(sum(price)) from test.OrderEvent.win:time_batch(3 sec) where price>10

它還支持自定義函數,舉個例子,做個計算百分比的函數

public class MyUtilityClass{
public static double computePercent(double amount, double total) {
return amount / total * 100;
}
}

配置一下

<esper-configuration
<plugin-singlerow-function name="percent"
function-class="mycompany.MyUtilityClass" function-method="computePercent" />
</esper-configuration>

OK了,可以用了

select percent(price,total) from OrderEvent

總體來說,ESPER的EPL功能非常強大,而且基本和SQL類似,入門容易,構造一個實時數據分析系統比較簡單,且維護成本低,新應用進來只需要簡單配置一下EPL語句就可以了,方便快捷,對大部分的系統還是比較適合的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM