https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-Window
https://docs.wso2.com/display/CEP400/Inbuilt+Windows#InbuiltWindows
http://wso2.com/library/articles/2013/06/understanding-siddhi-powers-wso2-cep-2x/
https://docs.wso2.com/display/CEP400/Samples+on+Processing+Events
windows機制有點晦澀,而且例子給的也不充分,這里詳細看看。
基本語法:
from <input stream name>[<filter condition>]#window.<window name>(<parameter>, <parameter>, ... )
select <attribute name>, <attribute name>, ...
insert [current events | expired events | all events] into <output stream name>
window.length
直接看個例子,這里用expired event,但使用的時候往往不用expired
"define stream cseEventStream (symbol string, price float, volume long);" + "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.length(6)" + "select symbol, price, avg(price) as ap, sum(price) as sp, count(price) as cp " + "group by symbol " + "insert expired events into outputStream;";
簡單解釋下,
define,定義stream,stream中每個event的結構
@info,可選,定義query的名字
query的含義,對於cseEventStream,當price<700時,生成length為4的窗口
那么當windows的length超過4的時候,就會產生expired event,此時就會觸發insert操作
insert的內容取決於select
下面我輸入如下的流數據,
int i = 0; while (i < 10) { float p = i*10; inputHandler.send(new Object[]{"WSO2", p, 100}); System.out.println("\"WSO2\", " + p); inputHandler.send(new Object[] {"IBM", p, 100}); System.out.println("\"IBM\", " + p); Thread.sleep(1000); i++; }
得到的結果部分如下,
"WSO2", 0.0 "IBM", 0.0 "WSO2", 10.0 "IBM", 10.0 "WSO2", 20.0 "IBM", 20.0
"WSO2", 30.0 receive events: 1
Event{timestamp=1447906176329, data=[WSO2, 0.0, 15.0, 30.0, 2], isExpired=false}
"IBM", 30.0 receive events: 1 Event{timestamp=1447906176331, data=[IBM, 0.0, 15.0, 30.0, 2], isExpired=false}
"WSO2", 40.0
receive events: 1
Event{timestamp=1447906177331, data=[WSO2, 10.0, 25.0, 50.0, 2], isExpired=false}
"IBM", 40.0 receive events: 1 Event{timestamp=1447906177331, data=[IBM, 10.0, 25.0, 50.0, 2], isExpired=false}
解釋下,可以說明幾個問題,
1. window length = 6, 所以當發出第7個event的時,會觸發expired
2. 此時,outputStream就會收到這條expired的event
3. 從這個event當然我們可以得到該event的所有信息,並且還可以通過aggregate functions來得到當前window中的所有events的統計值
這個地方很難以理解,得到的event只是expired的,無法得到window中的所有event,但用aggre func卻可以對window你們的events做統計
這里我們做了3個統計,平均值,sum, count,這樣你可以看出avg是怎么算出來的?
比如,對於Event{timestamp=1447906176329, data=[WSO2, 0.0, 15.0, 30.0, 2], isExpired=false}
由於我們加了groupby,所以只會針對symbol=wso2的做統計,
當我們發送"WSO2", 30.0 時,會觸發"WSO2", 0.0的過期,你會發現這時候去統計,這兩條event都會被排除在外,參加統計的如下
"IBM", 0.0 "WSO2", 10.0 "IBM", 10.0 "WSO2", 20.0 "IBM", 20.0
所以,count為2, sum為30,而avg=15
如果不加groupby的結果如下,
"WSO2", 0.0 "IBM", 0.0 "WSO2", 10.0 "IBM", 10.0 "WSO2", 20.0 "IBM", 20.0 "WSO2", 30.0 receive events: 1 Event{timestamp=1447913986723, data=[WSO2, 0.0, 12.0, 60.0, 5], isExpired=false} "IBM", 30.0 receive events: 1 Event{timestamp=1447913986725, data=[IBM, 0.0, 18.0, 90.0, 5], isExpired=false}
這樣就不會管symbol是什么,會把window里面的全相加
這里expired event是可選的,還有current event和all event,
expired event是當event expired時觸發,那么current event就是當event達到時觸發,all event就是兩種情況都觸發,
下面我們看看如果換成all event,會是什么結果,我測的結果是和current event一樣的,只會在event到達的時候觸發,bug?
"WSO2", 10.0 "IBM", 10.0 receive events: 1 Event{timestamp=1447914310502, data=[WSO2, 10.0, 5.0, 10.0, 2], isExpired=false} receive events: 1 Event{timestamp=1447914310502, data=[IBM, 10.0, 5.0, 10.0, 2], isExpired=false} "WSO2", 20.0 "IBM", 20.0 receive events: 1 Event{timestamp=1447914311503, data=[WSO2, 20.0, 10.0, 30.0, 3], isExpired=false} receive events: 1 Event{timestamp=1447914311503, data=[IBM, 20.0, 10.0, 30.0, 3], isExpired=false} "WSO2", 30.0 "IBM", 30.0 receive events: 1 Event{timestamp=1447914312503, data=[WSO2, 30.0, 20.0, 60.0, 3], isExpired=false} receive events: 1 Event{timestamp=1447914312503, data=[IBM, 30.0, 20.0, 60.0, 3], isExpired=false} "WSO2", 40.0 "IBM", 40.0 receive events: 1 Event{timestamp=1447914313503, data=[WSO2, 40.0, 30.0, 90.0, 3], isExpired=false} receive events: 1 Event{timestamp=1447914313503, data=[IBM, 40.0, 30.0, 90.0, 3], isExpired=false}
window.time
這個和length是一樣的,只是觸發條件是time
"define stream cseEventStream (symbol string, price float, volume long);" + "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.time(2 sec)" + "select symbol, price, avg(price) as ap, sum(price) as sp, count(price) as cp " + "group by symbol " + "insert expired events into outputStream;";
得到結果如下,
"WSO2", 0.0 "IBM", 0.0 "WSO2", 10.0 "IBM", 10.0 "WSO2", 20.0 "IBM", 20.0 receive events: 1 Event{timestamp=1447915287974, data=[WSO2, 0.0, 10.0, 10.0, 1], isExpired=false} receive events: 1 Event{timestamp=1447915287977, data=[IBM, 0.0, 15.0, 30.0, 2], isExpired=false} "WSO2", 30.0 "IBM", 30.0 receive events: 2 Event{timestamp=1447915288975, data=[WSO2, 10.0, 20.0, 20.0, 1], isExpired=false} Event{timestamp=1447915288975, data=[IBM, 10.0, 20.0, 20.0, 1], isExpired=false}
可以看到,這里expire是根據時間的,所以expire不一定是在event來的時候判斷,而是根據scheduled timer,如下圖,
所以在算統計的時候,取決於當時間timer被觸發時,window里面有幾個event,所以上面的結果有可能是1,也有可能是2
window.lengthBatch;timeBatch
這種window就是非sliding的,直接看例子,
"define stream cseEventStream (symbol string, price float, volume long);" + "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.lengthBatch(4)" + "select symbol, price " + "insert expired events into outputStream;";
仍然是上面的輸入,得到結果,
"WSO2", 0.0 "IBM", 0.0 "WSO2", 10.0 "IBM", 10.0 "WSO2", 20.0 "IBM", 20.0 "WSO2", 30.0 "IBM", 30.0 receive events: 4 Event{timestamp=1447923776094, data=[WSO2, 0.0], isExpired=false} Event{timestamp=1447923776094, data=[IBM, 0.0], isExpired=false} Event{timestamp=1447923776094, data=[WSO2, 10.0], isExpired=false} Event{timestamp=1447923776094, data=[IBM, 10.0], isExpired=false} "WSO2", 40.0 "IBM", 40.0 "WSO2", 50.0 "IBM", 50.0 receive events: 4 Event{timestamp=1447923778094, data=[WSO2, 20.0], isExpired=false} Event{timestamp=1447923778094, data=[IBM, 20.0], isExpired=false} Event{timestamp=1447923778094, data=[WSO2, 30.0], isExpired=false} Event{timestamp=1447923778094, data=[IBM, 30.0], isExpired=false}
可以看到,lengthBatch設為4,當window的length達到8的時候,才觸發expired
每次以一個batch進行expire,所以每次收到4條events,並且不重復的,所以window是沒有sliding的
再看過timeBatch的例子,這次用 all event
"define stream cseEventStream (symbol string, price float, volume long);" + "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.timeBatch(3 sec)" + "select symbol, price " + "insert all events into outputStream;";
結果如下,我們每發一組會sleep 1s,所以發6組后觸發第一次expired,expire 6條events
並且可以看到,這次除了expire,在event reach的時候也會觸發output,因為這次我們用的是all event
"WSO2", 0.0 "IBM", 0.0 "WSO2", 10.0 "IBM", 10.0 "WSO2", 20.0 "IBM", 20.0 receive events: 6 Event{timestamp=1447924146613, data=[WSO2, 0.0], isExpired=false} Event{timestamp=1447924146614, data=[IBM, 0.0], isExpired=false} Event{timestamp=1447924147614, data=[WSO2, 10.0], isExpired=false} Event{timestamp=1447924147614, data=[IBM, 10.0], isExpired=false} Event{timestamp=1447924148614, data=[WSO2, 20.0], isExpired=false} Event{timestamp=1447924148614, data=[IBM, 20.0], isExpired=false} "WSO2", 30.0 "IBM", 30.0 "WSO2", 40.0 "IBM", 40.0 "WSO2", 50.0 "IBM", 50.0 receive events: 12 Event{timestamp=1447924152571, data=[WSO2, 0.0], isExpired=false} Event{timestamp=1447924152571, data=[IBM, 0.0], isExpired=false} Event{timestamp=1447924152571, data=[WSO2, 10.0], isExpired=false} Event{timestamp=1447924152571, data=[IBM, 10.0], isExpired=false} Event{timestamp=1447924152571, data=[WSO2, 20.0], isExpired=false} Event{timestamp=1447924152571, data=[IBM, 20.0], isExpired=false} Event{timestamp=1447924149614, data=[WSO2, 30.0], isExpired=false} Event{timestamp=1447924149614, data=[IBM, 30.0], isExpired=false} Event{timestamp=1447924150614, data=[WSO2, 40.0], isExpired=false} Event{timestamp=1447924150614, data=[IBM, 40.0], isExpired=false} Event{timestamp=1447924151614, data=[WSO2, 50.0], isExpired=false} Event{timestamp=1447924151614, data=[IBM, 50.0], isExpired=false}
但對於這樣的場景,我們一般的需求是,對於batch做些統計, 例子,
"define stream cseEventStream (symbol string, price float, volume long);" + "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.lengthBatch(4) " + "select symbol, price, avg(price) as avgPrice " + "group by symbol " + "insert into outputStream;";
得到的結果,
"WSO2", 0.0 "IBM", 0.0 "WSO2", 10.0 "IBM", 10.0 receive events: 2 Event{timestamp=1447991871794, data=[WSO2, 10.0, 5.0], isExpired=false} Event{timestamp=1447991871794, data=[IBM, 10.0, 5.0], isExpired=false} "WSO2", 20.0 "IBM", 20.0 "WSO2", 30.0 "IBM", 30.0 receive events: 2 Event{timestamp=1447991873795, data=[WSO2, 30.0, 25.0], isExpired=false} Event{timestamp=1447991873795, data=[IBM, 30.0, 25.0], isExpired=false}
可以看到,對於batch中的數據可以groupby,並進行avg統計,
注意這里,不要用expired events,否則aggre結果一直為0,因為對於batch,每次expire完后,window里面是空的。
window.externalTime
https://docs.wso2.com/display/CEP400/Sample+0114+-+Using+External+Time+Windows
這個挺有用,可以以外部的時間進行slide window,因為大部分時間可能是根據采集時間,而非到達時間做聚合
但局限在於,externalTime必須遞增的,有時候在實際場景中,無法保證嚴格的時序。
看例子,
"define stream cseEventStream (symbol string, price float, time long);" + "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.externalTime(time, 3 sec) " + "select symbol, price, time, sum(price) as ap, count(price) as cp " + "group by symbol " + "insert expired events into outputStream;";
發送的代碼如下,
int i = 0; long time = 1447921187000L; while (i < 10) { float p = i*10; inputHandler.send(new Object[]{"WSO2", p, time}); System.out.println("\"WSO2\", " + p + ", " + time); inputHandler.send(new Object[] {"IBM", p, time}); System.out.println("\"IBM\", " + p + ", " + time); Thread.sleep(1000); i++; time = time + 1000; }
目的,就是按外部時間time,進行sliding window,結果如下,
"WSO2", 0.0, 1447921187000 "IBM", 0.0, 1447921187000 "WSO2", 10.0, 1447921188000 "IBM", 10.0, 1447921188000 "WSO2", 20.0, 1447921189000 "IBM", 20.0, 1447921189000 "WSO2", 30.0, 1447921190000 "IBM", 30.0, 1447921190000 receive events: 2 Event{timestamp=1447921190000, data=[WSO2, 0.0, 1447921187000, 30.0, 2], isExpired=false} Event{timestamp=1447921190000, data=[IBM, 0.0, 1447921187000, 30.0, 2], isExpired=false} "WSO2", 40.0, 1447921191000 "IBM", 40.0, 1447921191000 receive events: 2 Event{timestamp=1447921191000, data=[WSO2, 10.0, 1447921188000, 50.0, 2], isExpired=false} Event{timestamp=1447921191000, data=[IBM, 10.0, 1447921188000, 50.0, 2], isExpired=false}
可以看到根據傳入的time,當收到"WSO2", 30.0, 1447921190000 時觸發3秒的過期
其他的和普通的sliding window沒有區別
window.cron
https://docs.wso2.com/display/CEP400/Sample+0115+-+Quartz+scheduler+based+alerts
定時任務,其實用timeBatch也可以實現,只是cron更方便些
例子,
"define stream cseEventStream (symbol string, price float, time long);" + "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.cron('*/4 * * * * ?') " + "select symbol, time, sum(price) as ap, count(price) as cp " + "group by symbol " + "insert into outputStream;";
關鍵是要理解cron的語法,參考http://www.cnblogs.com/wangyuyu/p/4230742.html
Siddhi的語法多了秒,所以第一個是秒,*/4,即每4秒觸發一次
得到結果如下,可以看到確實是每4秒觸發一次
"WSO2", 10.0, 1447921188000 "IBM", 10.0, 1447921188000 "WSO2", 20.0, 1447921189000 "IBM", 20.0, 1447921189000 "WSO2", 30.0, 1447921190000 "IBM", 30.0, 1447921190000 "WSO2", 40.0, 1447921191000 "IBM", 40.0, 1447921191000 receive events: 2 Event{timestamp=1448006719652, data=[WSO2, 1447921191000, 100.0, 4], isExpired=false} Event{timestamp=1448006719652, data=[IBM, 1447921191000, 100.0, 4], isExpired=false} "WSO2", 50.0, 1447921192000 "IBM", 50.0, 1447921192000 "WSO2", 60.0, 1447921193000 "IBM", 60.0, 1447921193000 "WSO2", 70.0, 1447921194000 "IBM", 70.0, 1447921194000 "WSO2", 80.0, 1447921195000 "IBM", 80.0, 1447921195000 receive events: 2 Event{timestamp=1448006723653, data=[WSO2, 1447921195000, 260.0, 4], isExpired=false} Event{timestamp=1448006723653, data=[IBM, 1447921195000, 260.0, 4], isExpired=false}
window.unique, window.firstUnique
功能如其意,直接看例子,
"define stream cseEventStream (symbol string, price float, time long);" + "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.unique(symbol) " + "select symbol, price, time " + "insert into outputStream;";
得到結果,從結果看起來,就和普通的流流過一樣,
因為每次這個symbol有更新都會觸發一次event,
"WSO2", 0.0, 1447921187000 "IBM", 0.0, 1447921187000 receive events: 2 Event{timestamp=1448009613618, data=[WSO2, 0.0, 1447921187000], isExpired=false} Event{timestamp=1448009613620, data=[IBM, 0.0, 1447921187000], isExpired=false} "WSO2", 10.0, 1447921188000 "IBM", 10.0, 1447921188000 receive events: 1 Event{timestamp=1448009614633, data=[WSO2, 10.0, 1447921188000], isExpired=false} receive events: 1 Event{timestamp=1448009614633, data=[IBM, 10.0, 1447921188000], isExpired=false} "WSO2", 20.0, 1447921189000 "IBM", 20.0, 1447921189000 receive events: 2 Event{timestamp=1448009615650, data=[WSO2, 20.0, 1447921189000], isExpired=false} Event{timestamp=1448009615650, data=[IBM, 20.0, 1447921189000], isExpired=false} "WSO2", 30.0, 1447921190000 receive events: 1 "IBM", 30.0, 1447921190000 Event{timestamp=1448009616650, data=[WSO2, 30.0, 1447921190000], isExpired=false} receive events: 1 Event{timestamp=1448009616650, data=[IBM, 30.0, 1447921190000], isExpired=false}
再看看first unique,
"define stream cseEventStream (symbol string, price float, time long);" + "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.firstUnique(symbol) " + "select symbol, price, time " + "insert into outputStream;";
得到的結果,可以看到只有symbol第一次出現時,會觸發
"WSO2", 0.0, 1447921187000 "IBM", 0.0, 1447921187000 receive events: 1 Event{timestamp=1448008769827, data=[WSO2, 0.0, 1447921187000], isExpired=false} receive events: 1 Event{timestamp=1448008769831, data=[IBM, 0.0, 1447921187000], isExpired=false} "WSO2", 10.0, 1447921188000 "IBM", 10.0, 1447921188000 "WSO2", 20.0, 1447921189000 "IBM", 20.0, 1447921189000 "WSO2", 30.0, 1447921190000 "IBM", 30.0, 1447921190000 "WSO2", 40.0, 1447921191000 "IBM", 40.0, 1447921191000 "WSO2", 50.0, 1447921192000 "IBM", 50.0, 1447921192000 "WSO2", 60.0, 1447921193000 "IBM", 60.0, 1447921193000 "WSO2", 70.0, 1447921194000 "IBM", 70.0, 1447921194000 "WSO2", 80.0, 1447921195000 "IBM", 80.0, 1447921195000 "WSO2", 90.0, 1447921196000 "IBM", 90.0, 1447921196000
這個往往和join會同時使用,如
from SymbolStream#window.lenght(1) unidirectional join StockExchangeStream#window.unique("symbol") insert into StockQuote StockExchangeStream.symbol as symbol,StockExchangeStream.price as lastTradedPrice
Output Rate Limiting
只所以在這里介紹這個,是因為覺得和unique一起用,很合適
基本語法,output ({<output-type>} every (<time interval>|<event interval> events) | snapshot every <time interval>)
其中"<output-type>","first", "last" and "all",默認是all
比如普通的window,如果每條都觸發,太頻繁了,我只想固定條數或時間觸發一次就可以
這個對於unique尤為合適,因為使用unique,一般是只想知道最新的情況,所以每一條都觸發是沒有意義的,定期觸發就可以
還是用前面的例子,
"define stream cseEventStream (symbol string, price float, time long);" + "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.unique(symbol) " + "select symbol, price, time " + "group by symbol " + "output last every 5 events " + "insert into outputStream;";
得到的結果,雖然加上group by symbol,所以每次都會分別輸出wso2,ibm兩條
但是對於event數的判斷還是合一塊的,並不是5條wso2或5條ibm觸發
"WSO2", 0.0, 1447921187000 "IBM", 0.0, 1447921187000 "WSO2", 10.0, 1447921188000 "IBM", 10.0, 1447921188000 "WSO2", 20.0, 1447921189000 "IBM", 20.0, 1447921189000 receive events: 2 Event{timestamp=1448010405404, data=[WSO2, 20.0, 1447921189000], isExpired=false} Event{timestamp=1448010404405, data=[IBM, 10.0, 1447921188000], isExpired=false} "WSO2", 30.0, 1447921190000 "IBM", 30.0, 1447921190000 "WSO2", 40.0, 1447921191000 "IBM", 40.0, 1447921191000 receive events: 2 Event{timestamp=1448010407404, data=[IBM, 40.0, 1447921191000], isExpired=false} Event{timestamp=1448010407404, data=[WSO2, 40.0, 1447921191000], isExpired=false}
用時間也是一樣的,
"define stream cseEventStream (symbol string, price float, time long);" + "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.unique(symbol) " + "select symbol, price, time " + "group by symbol " + "output last every 5 sec " + "insert into outputStream;";
結果如下,
"WSO2", 0.0, 1447921187000 "IBM", 0.0, 1447921187000 "WSO2", 10.0, 1447921188000 "IBM", 10.0, 1447921188000 "WSO2", 20.0, 1447921189000 "IBM", 20.0, 1447921189000 "WSO2", 30.0, 1447921190000 "IBM", 30.0, 1447921190000 "WSO2", 40.0, 1447921191000 "IBM", 40.0, 1447921191000 receive events: 2 Event{timestamp=1448010645533, data=[WSO2, 40.0, 1447921191000], isExpired=false} Event{timestamp=1448010645533, data=[IBM, 40.0, 1447921191000], isExpired=false} "WSO2", 50.0, 1447921192000 "IBM", 50.0, 1447921192000 "WSO2", 60.0, 1447921193000 "IBM", 60.0, 1447921193000 "WSO2", 70.0, 1447921194000 "IBM", 70.0, 1447921194000 "WSO2", 80.0, 1447921195000 "IBM", 80.0, 1447921195000 "WSO2", 90.0, 1447921196000 "IBM", 90.0, 1447921196000 receive events: 2 Event{timestamp=1448010650533, data=[WSO2, 90.0, 1447921196000], isExpired=false} Event{timestamp=1448010650533, data=[IBM, 90.0, 1447921196000], isExpired=false}
snapshot功能,emit all current events arrived so far,這個一般不會直接這么用,想不出啥場景
例子,
"define stream cseEventStream (symbol string, price float, time long);" + "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.unique(symbol) " + "select symbol, price, time " + "group by symbol " + "output snapshot every 2 sec " + "insert into outputStream;";
結果如下,
"WSO2", 0.0, 1447921187000 "IBM", 0.0, 1447921187000 "WSO2", 10.0, 1447921188000 "IBM", 10.0, 1447921188000 receive events: 4 Event{timestamp=1448011434403, data=[WSO2, 0.0, 1447921187000], isExpired=false} Event{timestamp=1448011434405, data=[IBM, 0.0, 1447921187000], isExpired=false} Event{timestamp=1448011435405, data=[WSO2, 10.0, 1447921188000], isExpired=false} Event{timestamp=1448011435405, data=[IBM, 10.0, 1447921188000], isExpired=false} "WSO2", 20.0, 1447921189000 "IBM", 20.0, 1447921189000 "WSO2", 30.0, 1447921190000 "IBM", 30.0, 1447921190000 receive events: 8 Event{timestamp=1448011434403, data=[WSO2, 0.0, 1447921187000], isExpired=false} Event{timestamp=1448011434405, data=[IBM, 0.0, 1447921187000], isExpired=false} Event{timestamp=1448011435405, data=[WSO2, 10.0, 1447921188000], isExpired=false} Event{timestamp=1448011435405, data=[IBM, 10.0, 1447921188000], isExpired=false} Event{timestamp=1448011436405, data=[WSO2, 20.0, 1447921189000], isExpired=false} Event{timestamp=1448011436405, data=[IBM, 20.0, 1447921189000], isExpired=false} Event{timestamp=1448011437405, data=[WSO2, 30.0, 1447921190000], isExpired=false} Event{timestamp=1448011437405, data=[IBM, 30.0, 1447921190000], isExpired=false}
window.sort
在window中排序,
<event> sort(<int> windowLength, <string> attribute, <string> order, .. , <string> attributeN, <string> orderN)
order,"asc" or "desc",默認為asc
例子,
"define stream cseEventStream (symbol string, price float, time long);" + "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.sort(3, price, 'asc') " + "select symbol, price, time " + "group by symbol " + "insert all events into outputStream;";
length為3,對price升序;這里的意思是,當window length >3時,即4,會輸出按price升序排序,最大的那個event
結果如下,
"WSO2", 0.0, 1447921187000 "IBM", 0.0, 1447921187000 Events{ @timeStamp = 1448875633289, inEvents = [Event{timestamp=1448875633289, data=[WSO2, 0.0, 1447921187000], isExpired=false}], RemoveEvents = null } Events{ @timeStamp = 1448875633290, inEvents = [Event{timestamp=1448875633290, data=[IBM, 0.0, 1447921187000], isExpired=false}], RemoveEvents = null } "WSO2", 10.0, 1447921188000 "IBM", 10.0, 1447921188000 Events{ @timeStamp = 1448875634291, inEvents = [Event{timestamp=1448875634291, data=[WSO2, 10.0, 1447921188000], isExpired=false}], RemoveEvents = null } Events{ @timeStamp = 1448875634291, inEvents = [Event{timestamp=1448875634291, data=[IBM, 10.0, 1447921188000], isExpired=false}], RemoveEvents = [Event{timestamp=1448875634291, data=[IBM, 10.0, 1447921188000], isExpired=true}] } "WSO2", 20.0, 1447921189000 "IBM", 20.0, 1447921189000 Events{ @timeStamp = 1448875635292, inEvents = [Event{timestamp=1448875635292, data=[WSO2, 20.0, 1447921189000], isExpired=false}], RemoveEvents = [Event{timestamp=1448875635292, data=[WSO2, 20.0, 1447921189000], isExpired=true}] } Events{ @timeStamp = 1448875635292, inEvents = [Event{timestamp=1448875635292, data=[IBM, 20.0, 1447921189000], isExpired=false}], RemoveEvents = [Event{timestamp=1448875635292, data=[IBM, 20.0, 1447921189000], isExpired=true}] }
可以看到,大於3的時候,current event和expired event收到的都是一樣的,因為是asc排序,所以大於前3個的都會被過期
window.frequent;window.lossyFrequent
<event> frequent(<int> eventCount, <string> attribute, .. , <string> attributeN), based on Misra-Gries counting algorithm, 參考http://www.zhihu.com/question/23480657
這個processor的實現原理參考,http://mail.wso2.org/mailarchive/dev/2015-September/055230.html
說實在的,如果對這個算法不了解,相當的晦澀,
"define stream cseEventStream (symbol string, price float, time long);" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.frequent(2, symbol) " + "select symbol, price, time " + "insert all events into outputStream;";
frequent的意思,就是你接收current events,如果當前stream的event,是屬於top frequent的,就會輸出,否則就會丟掉
說白了,從current events,你可以一直重復的收到屬於top frequent的event,其他的則會丟掉
輸入如下,
String str = "attributes to attributes to to events. If no no no no attributes"; String[] strs = str.split(" "); for(String s:strs){ float p = i*10; inputHandler.send(new Object[]{s, p, time}); System.out.println(s + ", " + p + ", " + time); Thread.sleep(1000); i++; time = time + 1000; }
得到結果,來分析一下,
attributes, 0.0, 1447921187000 Events{ @timeStamp = 1448873866506, inEvents = [Event{timestamp=1448873866506, data=[attributes, 0.0, 1447921187000], isExpired=false}], RemoveEvents = null } to, 10.0, 1447921188000 Events{ @timeStamp = 1448873867509, inEvents = [Event{timestamp=1448873867509, data=[to, 10.0, 1447921188000], isExpired=false}], RemoveEvents = null } attributes, 20.0, 1447921189000 Events{ @timeStamp = 1448873868509, inEvents = [Event{timestamp=1448873868509, data=[attributes, 20.0, 1447921189000], isExpired=false}], RemoveEvents = null } to, 30.0, 1447921190000 Events{ @timeStamp = 1448873869509, inEvents = [Event{timestamp=1448873869509, data=[to, 30.0, 1447921190000], isExpired=false}], RemoveEvents = null } to, 40.0, 1447921191000 Events{ @timeStamp = 1448873870509, inEvents = [Event{timestamp=1448873870509, data=[to, 40.0, 1447921191000], isExpired=false}], RemoveEvents = null } events., 50.0, 1447921192000 If, 60.0, 1447921193000 Events{ @timeStamp = 1448873872509, inEvents = [Event{timestamp=1448873872509, data=[If, 60.0, 1447921193000], isExpired=false}], RemoveEvents = [Event{timestamp=1448873868509, data=[attributes, 20.0, 1447921189000], isExpired=true}] } no, 70.0, 1447921194000 Events{ @timeStamp = 1448873873509, inEvents = [Event{timestamp=1448873873509, data=[no, 70.0, 1447921194000], isExpired=false}], RemoveEvents = [Event{timestamp=1448873870509, data=[to, 40.0, 1447921191000], isExpired=true}, Event{timestamp=1448873872509, data=[If, 60.0, 1447921193000], isExpired=true}] }
前面一直都沒有問題,一直輸入attributes,to,
直到輸入events.,因為attributes,to已經占滿2個位置,所以要觸發過期,window里面的所有event的frequency減1,過期frequency=0的event
可是這里attributes,to的frequent都是大於0的,所以window里面沒有可以expire的event,
那么只能把當前的events.給丟掉了,所以在current events中並沒有收到這個event,‘events.’
因為我們只能收到top frequent的events
到收到if,再次觸發expire,window里面的所有event的frequency再次減1,
此時,attributes的frequency已經為0,所以attribute被過期,而event,‘if’,被放入window中,
所以此時,我們會在current events中看到‘if’,而在expired events中看到‘attributes’
<event> lossyFrequent(<double> supportThreshold, <double> errorBound, <string> attribute, .. , <string> attributeN), based on Lossy Counting algorithm, 參考http://stackoverflow.com/questions/8033012/what-is-lossy-counting
沒測,應該是判斷過期的算法不一樣,其他差不多

