什么是 Siddhi?
Siddhi 是一種 lightweight, easy-to-use, open source CEP(Complex Event Processing)引擎,由wso2公司開發(http://wso2.com/about/)。
像絕大多數的 CEP 系統一樣,Siddhi 支持對於流式數據的類 SQL 的查詢,SQL 式的 query 通過 complier 翻譯成 Java 代碼。
當一條數據流或多條數據流流入時,Siddhi Core 會實時的 check 當前數據流是否滿足定義的 query,如果滿足則觸發 Callback 執行相應的邏輯。
Siddhi和傳統的CEP系統,如Esper,相比區別?
主要是比較輕量和高效,之所以可以達到更高的 performance,因為:
- Multi-threading
- Queues and use of pipelining
- Nested queries and chaining streams
- Query optimization and common sub query elimination
尤其是前兩點非常關鍵,傳統的CEP系統,如果Esper,都是使用單線程去處理所有的 query matching,這樣雖然簡單,但是效率不高,無法利用 cpu 多核。
所以 Siddhi 采用多線程,並且結合pipeline機制,如下圖
Siddhi 將整個 query 切分成獨立的 stages,即 processors,這樣做的好處,首先是便於多線程化,再者,可以重用相同的 processor;
而 processor 之間通過 queue 進行連接,這里就不詳細描述了,有興趣的同學可以去仔細看 Siddhi 的論文和文檔。
Siddhi 能做什么?
下面我們就來看看,最關鍵的,Siddhi 可以為我們做什么?
這里就用幾個形象的例子來說明 Siddhi 使用的典型的場景
簡單 ETL
我們先用個最簡單的例子,看看如果 run 一個真正的 Siddhi 例子,
上面說了,Siddhi 是用類 SQL 的查詢語言,
首先需要先定義流的格式,
define stream TempStream (deviceID long, roomNo int, temp double);
然后定義查詢,
from TempStream select roomNo, temp * 9/5 + 32 as temp, 'F' as scale, roomNo >= 100 and roomNo < 110 as isServerRoom insert into RoomTempStream;
這樣就能實現一個完整的 ETL 過程,
extraction,將需要的字段從 TempStream 里面 select 出來;
transform, 將攝氏溫度轉換為華氏溫度;
loading,將結果輸出到RoomTempStream流;
很方便,不用再另外寫任何的邏輯,只需要寫類SQL的Query語句。
為了增加感性認識,我給出一個完成的 Java 測試例子,
SiddhiManager siddhiManager = new SiddhiManager(); String executionPlan = "" + "ddefine stream TempStream (deviceID int, roomNo int, temp float);" + "" + "@info(name = 'query1') " + "from TempStream " + "select roomNo, temp * 9/5 + 32 as temp, 'F' as scale, roomNo >= 100 and roomNo < 110 as isServerRoom " + "insert into RoomTempStream;"; ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); executionPlanRuntime.addCallback("query1", new QueryCallback() { @Override public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { EventPrinter.print(timeStamp, inEvents, removeEvents); } }); InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream"); executionPlanRuntime.start(); inputHandler.send(new Object[] {12344, 201, 28.2f}); inputHandler.send(new Object[] {12345, 202, 22.2f});
inputHandler.send(new Object[] {12346, 203, 24.2f});
//Shutting down the runtime executionPlanRuntime.shutdown(); //Shutting down Siddhi siddhiManager.shutdown();
基於 window 聚合
Siddhi 支持很多中類型的 window,具體參考https://docs.wso2.com/display/CEP400/Inbuilt+Windows#InbuiltWindows-time
這里給出最基本的,基於時間窗口的例子,
from TempStream#window.time(1 min) select roomNo, avg(temp) as avgTemp group by roomNo insert all events into AvgRoomTempStream ;
這個查詢會計算以1分鍾為滑動窗口的,每個 room 的平均溫度
Siddhi時間窗口也支持,按照外部的輸入的時間進行聚合,但是它要求時間是必須遞增的;這點我們brain的聚合庫比它通用,可以適用於非單調遞增的場景
多個流 Join
Siddhi 支持基於 window 的多個流的實時 join,
from TempStream[temp > 30.0]#window.time(1 min) as T join RegulatorStream[isOn == false]#window.length(1) as R on T.roomNo == R.roomNo select T.roomNo, T.temp, R.deviceID, 'start' as action insert into RegulatorActionStream ;
上面的查詢將,TempStream 和RegulatorStream 通過 roomNo 進行 join
Pattern Query
這種 query 最能表達出 CEP 的威力,什么是Pattern Query?
“Pattern allows event streams to be correlated over time and detect event patterns based on the order of event arrival.”
直接看個例子,用 Pattern 查詢來 detect credit card/ATM transaction frauds:
from every a1 = atmStatsStream[amountWithdrawed < 100] -> b1 = atmStatsStream[amountWithdrawed > 10000 and a1.cardNo == b1.cardNo] within 1 day select a1.cardNo as cardNo, a1.cardHolderName as cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as location, b1.cardHolderMobile as cardHolderMobile insert into possibleFraudStream;
注意看到這個符號‘->’,這個表示 event 發生順序,
上面這個查詢的意思就是,在一天內,出現一次取現金額 < 100后,同一張卡,出現取現金額 > 10000,則認為可能是 fraud。
當然這只是個例子,不是說這樣真的可以 detect fraud。你可以參照這個,寫出更為復雜的查詢。
Sequence Query
和 pattern 的區別是,pattern 的多個 event 之間可以是不連續的,但 sequence 的 events 之間必須是連續的。
我們可以看個例子,用 sequence 來發現股票價格的 peak:
from every e1=FilteredStockStream[price>20],
e2=FilteredStockStream[((e2[last].price is null) and price>=e1.price) or ((not (e2[last].price is null)) and price>=e2[last].price)],
e3=FilteredStockStream[price<e2[last].price] select e1.price as priceInitial, e2[last].price as pricePeak, e3.price as priceAfterPeak insert into PeakStream ;
上面的查詢的意思,
e1,收到一條 event.price>20
e2,后續收到的所有 events 的 price,都大於前一條 event
e3,最終收到一條 event 的 price,小於前一條 event
ok,我們發現了一個peak
Siddhi 還有其他很多的功能,這里就不一一說明。。。。。。
集成到 Storm
那么最后,我們看看如何將 Siddhi 融入到我們當前的框架中,達到作為 Brain 補充的目的。
我將 Siddhi core 封裝成一個 Siddhi Bolt,這樣可以在 JStorm 的 topology 中很靈活的,選擇是否什么方案,可以部分統計用 brain,部分用 Siddhi,非常簡單。
廢話不說,直接給出源碼,供大家參考,
public class SiddhiBolt implements IRichBolt { protected OutputCollector collector; protected SiddhiManager siddhiManager = null; protected String executionPlan = null; ExecutionPlanRuntime executionPlanRuntime = null; protected HashMap<String,InputHandler> handlers = null; public SiddhiBolt(String plan) { this.executionPlan = plan; } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.siddhiManager = new SiddhiManager(); this.executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); addCallbacks(); handlers = new HashMap<String,InputHandler>(); executionPlanRuntime.start(); } public void execute(Tuple tuple) { String inputStream = tuple.getSourceStreamId(); InputHandler inputHandler = getInputHandler(inputStream); List<Object> values = tuple.getValues(); Object[] objects = values.toArray(); try { inputHandler.send(objects); }catch (Exception e){ LOG.error("Send stream event error: ", e); } // collector.fail(tuple); //test replay collector.ack(tuple); // remember ack the tuple // Make sure that add anchor tuple if you want to track it // collector.emit(streamid, tuple,new Values(counters, now)); } public InputHandler getInputHandler(String streamName){ InputHandler handler = null; if(handlers.containsKey(streamName)) handler = handlers.get(streamName); else { handler = executionPlanRuntime.getInputHandler(streamName); if (handler != null) { handlers.put(streamName, handler); } } return handler; } //Need Override public void addCallbacks( ){ //StreamCallback example executionPlanRuntime.addCallback("outputStream", new StreamCallback() { @Override public void receive(Event[] events) { LOG.info("receive events: " + events.length); for (Event e:events) LOG.info(e); } }); //QueryCallback example executionPlanRuntime.addCallback("query1", new QueryCallback() { @Override public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { printEvents(timeStamp, inEvents, removeEvents); } }); } public void printEvents(long timeStamp, Event[] inEvents, Event[] removeEvents){ StringBuilder sb = new StringBuilder(); sb.append("Events{ @timeStamp = ").append(timeStamp).append(", inEvents = ").append( Arrays.deepToString(inEvents)).append(", RemoveEvents = ").append(Arrays.deepToString(removeEvents)).append(" }"); LOG.info(sb.toString()); } public void cleanup() { //Shutting down the runtime executionPlanRuntime.shutdown(); //Shutting down Siddhi siddhiManager.shutdown(); } }
Reference
1. Siddhi paper, https://people.apache.org/~hemapani/research/papers/siddi-gce2011.pdf
2. Siddhi doc, https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0


