首先介紹一下NESPER的大體結構,NEsper從內容上分為兩塊,NEsper的核心NEsper.dll和NEsper.IO.dll。
(1)NEsper的核心包包含了EPL語法解析引擎,事件監聽機制,事件處理等核心模塊。
(2)NEsper的io包含從各種數據源讀取數據以及將輸出結果寫入各種數據源,包括excel,database,msmq,http,socket,XML。
貼一張esper官網上的結構圖,方便大家了解esper的結構
接下來對上述結構圖進行詳細的解釋讓大家加深對ESPER的了解
1. Event對象:NESPER處理的事件的最小單位,一個任意的CLR對象,屬性支持簡單的CLR類型、數組、map、以及嵌套POCO,很靈活.
2.EPL:EPL是ESPER的核心,它類似於SQL,但是和SQL的執行方式不同。
SQL是數據在那里,你每次執行SQL就會觸發一次查詢;而EPL是查詢在這里,數據輸入達到一定條件即可觸發查詢。
這個條件可以有多種:
a).每個event對象來就觸發一次查詢,並只處理當前對象
select * from OrderEvent
這個EPL語句會在每個OrderEvent對象到達后,並將該event交給后續的Listener(后面會講到)來進行處理。但是這種用法不多見,意義不大。
b).窗口處理模式:
EPL最大的特色就是這個窗口處理模式,有兩種窗口,時間窗口和長度窗口。
時間窗口 : 大家想一下,如果有一個場景,要獲取最近3秒內OrderEvent的price的平均值,那該怎么做呢?一般的做法需要做個后台線程來做3秒的時間統計,時間到了再做后續處理,雖然不復雜,但是也挺繁瑣的。
看看EPL是怎么做的
win:time(3 sec)就是定義了3秒的時間窗口,avg(price)就是統計了3秒內的OrderEvent對象的price的平均值select avg(price) from test.OrderEvent.win:time(3 sec)
長度窗口: 長度窗口和時間窗口比較類似
win:length(10)就是定義了10個Event的,avg(price)就是統計了最近10個的OrderEvent對象的price的平均值select avg(price) from test.OrderEvent.win:length(100)
以上這些都比較容易理解,雖然知道了處理方法,也比較好用,我還是比較喜歡鑽研一下他的內部實現方式。先來看一張時間窗口模式的圖
他僅保留最近時間窗口的對象內容,但是每個Event到來都會觸發一次UpdateListener的操作
EPL語句會作為一個Statement來監聽事件的到來,當New Events有新事件時就會觸發UpdateListener的操作,下面是一個updateListener的簡單例子,event.get("avg(price))就可以獲得EPL查詢所獲得的price平均值,然后就可以加入自己的代碼進行處理,比如將結果寫入本地文件
而New Events和Old Events就是他的輸入,而ave(price)操作所計算的對象就是Length Window中的內容。
public class MyListener : UpdateListener {
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
EventBean event = newEvents[0];
Console.WriteLine("avg=" + event.Get("avg(price)"));
}
}
事件窗口也基本類似。
c)批量窗口處理模式
窗口模式是會在每個Event來都觸發一次UpdateListener操作,如果每秒Event數量達到很大的話這種方式明顯是不行的 CPU消耗會很厲害
批量窗口處理模式正好可以解決這個問題
批量時間窗口模式
批量長度窗口模式select avg(price) from test.OrderEvent.win:time_batch(3 sec)
時間批量模式的操作圖如下select avg(price) from test.OrderEvent.win:length_batch(10)
上圖的時間窗口大小為4s,他會在4s的窗口時間到達以后才將窗口中的內容一起扔給UpdateListener來進行處理,性能相對節約很多,特別是大數據量的情況下。長度批量窗口的處理模式也是類似。
上述窗口模式下內存使用情況又是如何呢?經過本人測試和研究代碼發現,它會保留兩個窗口的內存使用量,一個保存當前窗口的Events,一個保存上一個窗口的Events,因此在估算一個數據分析程序占用多少內存要看上面監聽的EPL語句開的窗口的大小以及數據的TPS,防止內存OOM。
掌握了上面的窗口的概念,后面其他的內容都很好理解了
d) 過濾
where過濾
select avg(price) from test.OrderEvent.win:time_batch(3 sec) where price>10
having過濾
似曾相識啊,執行方式也基本和SQL里的where 和 having差不多。select avg(price) from test.OrderEvent.win:time_batch(3 sec) having price>10
在EPL里where 是在incoming Events到window之間進行過濾,having是在window到New Eventing之間進行過濾
e)聚合
count
select count(price) from test.OrderEvent.win:time_batch(3 sec) where price>10
sum
group byselect sum(price) from test.OrderEvent.win:time_batch(3 sec) where price>10
select itemName,sum(price) from test.OrderEvent.win:time_batch(3 sec) where price>10 group by itemName
都很簡單,了解SQL的都狠容易上手
f) 函數
ESPER默認加載
• System
• System.Collections
• System.Text
支持這些包下的函數方法,例如
select Math.round(sum(price)) from test.OrderEvent.win:time_batch(3 sec) where price>10
它還支持自定義函數,舉個例子,做個計算百分比的函數
配置一下public class MyUtilityClass{
public static double computePercent(double amount, double total) {
return amount / total * 100;
}
}
OK了,可以用了<esper-configuration
<plugin-singlerow-function name="percent"
function-class="mycompany.MyUtilityClass" function-method="computePercent" />
</esper-configuration>
select percent(price,total) from OrderEvent
總體來說,ESPER的EPL功能非常強大,而且基本和SQL類似,入門容易,構造一個實時數據分析系統比較簡單,且維護成本低,新應用進來只需要簡單配置一下EPL語句就可以了,方便快捷,對大部分的系統還是比較適合的。