來自小韓
什么是Esper
想要認識Esper,先要了解CEP(Complex Event Processing),到處都有,並且各方理解也有偏差,我就不贅述了。
Esper就是CEP的一個java的開源實現。
Esper官方網址:http://www.espertech.com/
Esper的特性
在探究Esper特性之前,我們先總結一下復雜事件的特性:
- 類型多樣,不易建模
- 場景不可控,隨時可能新增場景
- 邏輯復雜,難於描述
- 既需要匯總規律,又不准數據落地。
場景舉例:
股市實時K線圖、網站惡意操作監測、用戶登入控制、實時數據統計等
針對復雜事件的這些特性,Esper具有相應的性質:
- 實時響應
- 極速擴展新事件
- 語法描述能力強大、類sql
事件和處理流程
一個基本的例子
引用Esper就不說了,直接搜maven esper,添加以來就好。
/*定義事件模型*/
public class Coder {
private String Name;
private int age;
private double Salary;
//省略getter/setter
}
//
public class HelloEsperApp {
public static void main(String[] args) throws InterruptedException {
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPAdministrator admin = epService.getEPAdministrator();
//指定事件模型
String coderModel = Coder.class.getName();
//描述復雜事件
String epl = "select name,salary,age from " + coderModel;
EPStatement state = admin.createEPL(epl);
//添加事后處理
state.addListener(new HelloEsperListener());
EPRuntime runtime = epService.getEPRuntime();
//模擬事件發生
for (int i = 0; i < 10; i++) {
Coder coder = new Coder();
coder.setName("coder"+i);
coder.setAge(20+i);
runtime.sendEvent(coder);
}
}
}
import com.espertech.esper.client.UpdateListener;
/** 實現UpdateListener接口,來定義事件的后置處理過程 **/
public class HelloEsperListener implements UpdateListener {
public void update(EventBean[] arg0, EventBean[] arg1) {
try {
System.out.println("coder: name-"+arg0[0].get("name") + " age-"+arg0[0].get("age"));
}catch(Exception e) {
e.printStackTrace();
}
}
小心的分享:筆者認為任何一個項目的學習都可以分為兩步,1.看他暴露了多少給你(用起來是否清晰);2.看他隱藏了多少細節(實現得是否精致)
一張圖簡單描述一下Esper給我們暴露了什么:
其中,后置處理方式的subscribe本文暫不提及。
Esper支持的模型類型
前面的例子中,我們使用一個bean作為事件模型。這樣做很多時候都存在局限性,當有新的事件模型需要被加入時,我們必須要定義新的類,實際上,在實際應用中,bean很少被使用。
Esper支持豐富的事件模型定義方式,包括java類、Map、List、XML。
例如使用Map來定義事件模型:
Map<String, Object> blog = new HashMap<String, Object>();
blog.put("url", String.class);
blog.put("title", String.class);
Map<String, Object> coder = new HashMap<String, Object>();
coder.put("name", String.class);
coder.put("salary", int.class);
coder.put("friends", List.class);
coder.put("blogKey", "Blog");
// 注冊blog模型到esper
admin.getConfiguration().addEventType("Blog", blog);
// 注冊coder到Esper
admin.getConfiguration().addEventType("Coder", coder);
這之后,我們就可以對Coder事件模型進行EPL事件描述了。
EPL語法
關於語法,我們大致講一下,到能幫助大家整體理解Esper的程度,之后會單獨寫文章來分享語法技巧。
基本語法
以上面的Coder模型為例,類SQL部分,我們快速帶過:
#實時計算出當前進入引擎的所有程序員的平均工資、最高工資。
select name,Blog.title,salary,avg(salary) as avgSalary,max(salary) as maxSalary from Coder where salary>1000;
#其他的例如 and、or、not、in、order by、group by、having、join等基本都與SQL標准一致
與SQL不同的地方,主要體現了實時性,與sql的持久數據形成對比我們列四點最重要的學習一下:insert、窗格、context、pattern。
insert
首先用SQL的視角去想象,好像把數據保存起來這樣的動作在Esper這樣實時處理的工具中好像確實沒有場景。
實際上insert在Esper中做的是轉發的角色。即把某事件模型類的事件經過EPL運算后,insert成另外一種事件,去觸發另外事件的處理流程。看例子:
//在前文Coder的基礎上,給Coder加上團隊標志,再定義一個Team事件模型
Map<String, Object> coder = new HashMap<String, Object>();
coder.put("name", String.class);
coder.put("salary", int.class);
coder.put("teamName", String.class);
Map<String, Object> team = new HashMap<String, Object>();
team.put("teamName", String.class);
team.put("totalSalary", int.class);
admin.getConfiguration().addEventType("Coder", coder);
admin.getConfiguration().addEventType("Team", team);
String epl4CoderIn = "insert Team(teamName, totalSalary) select teanName,sum(salary) as totalSalary from Coder groupby teamName";
String epl4TeamIn = "select teamName, totalSalary from Team";
EPStatement state = admin.createEPL(epl4CoderIn);
EPStatement state = admin.createEPL(epl4TeamIn);
state.addListener(new TeamEventListener());
這時,每當有Coder類型的事件流入,就轉化為新的Team事件去觸發Team的流程。Esper可以基於此實現事件流。
窗格
窗格應該是最有別於SQL的特性了,Esper的事件收集器支持事件積攢,分為兩種積攢方式,時間和數量。
-
時間窗格
select name, avg(salary) from Coder.win:time(5 sec);
計算5秒內進入引擎的Coder的平均工資,看圖理解:
每有新的事件進入,就會觸發事件收集器回溯5秒,找到這五秒內收到的所有事件,並一同發給后置處理器。 -
時間批量窗格
select name, avg(salary) from Coder.win:time_batch(5 sec);
看圖理解:
在事件批量窗格中,事件的進入不觸發后置處理,只有滿了一個批量單位才會進行提交。例如在股市K線圖的實時繪制中就采用這種機制。
事件窗格支持的時間定義,小到毫秒大到年,包括msec、sec、min、hour、day、week、month、year。例如我們要定義一個周期為一天5小時20分的窗格,可以寫作:
1 day 5 hour 20 min -
數量窗格
數量窗格跟事件窗格一樣也分普通和批量,我們先來看一下語法例子:
select name, avg(salary) from Coder.win:length(5);
select name, avg(salary) from Coder.win:length_batch(5);
非批量情況下,事件進入就會觸發收集引擎回溯最近的五個事件,批量提交給后置處理器;批量情況下則是,每滿5個事件才提交一次,不滿則等待。
context
即使是同樣的事件,也有很多情況下需要分組處理,context為Esper提供一種組別定義的能力,從而讓事件流入不同的處理組中我們結合前面的窗格舉一個實際的例子。
場景:游戲對局匹配
規則:十個人一組,進行5V5游戲,玩家按等級分為青銅、白銀、黃金、白金、鑽石、王者六種,為了保證游戲競技性,只有級別相同的玩家才能共同進行一局游戲。
如果只是十個人對戰,那個很簡單,我們只需要按照前文介紹的,數量批量提交窗口,定義一個容量為10的窗格,每滿十個,就交給后置處理器去創建對局。但是現在的場景又加入了玩家水平等級制,這時,就需要Context幫忙了。
// 省略Player模型的創建過程
// 創建context以Player的level屬性作為組分割條件
String createContext = "create context forLevel partition by level from Player";
// 帶上context前綴
String match = "context forLevel select * from Player.win:length_batch(10)";
admin.createEPL(createContext);
EPStatement state = admin.createEPL(match);
state.addListener(new StartGameListener());
pattern
pattern具有極強大的邏輯描述能力,本文暫不介紹,后續獨立發文。
進程模型分析
其實看過了前面的內容這一部分,我們先搬官網的圖出來看看:
不知道大家什么感覺,總之我是不願意看的...我們來翻譯一下第一張圖:
官網給出的第二張事件流收集原理實際上可以看做是引擎入口處的EPL語句的解析過程。引擎中將事件分了三級緩存:
入門先到這里!