Storm Windowing
簡介
Storm可同時處理窗口內的所有tuple。窗口可以從時間或數量上來划分,由如下兩個因素決定:
- 窗口的長度,可以是時間間隔或Tuple數量;
- 滑動間隔(sliding Interval),可以是時間間隔或Tuple數量;
要確保topo的過期時間大於窗口的大小加上滑動間隔
Sliding Window:滑動窗口
按照固定的時間間隔或者Tuple數量滑動窗口。
- 如果滑動間隔和窗口大小一樣則等同於滾窗,
- 如果滑動間隔大於窗口大小則會丟失數據,
- 如果滑動間隔小於窗口大小則會窗口重疊。
Tumbling Window:滾動窗口
元組被單個窗口處理,一個元組只屬於一個窗口,不會有窗口重疊。
根據我自己的經驗其實一般用滾動就可以了
構造builder的時候支持以下的配置
(時間和數量的排列組合):
- withWindow(Count windowLength, Count slidingInterval)
滑窗 窗口長度:tuple數, 滑動間隔: tuple數 - withWindow(Count windowLength)
滑窗 窗口長度:tuple數, 滑動間隔: 每個tuple進來都滑 - withWindow(Count windowLength, Duration slidingInterval)
滑窗 窗口長度:tuple數, 滑動間隔: 時間間隔 - withWindow(Duration windowLength, Duration slidingInterval)
滑窗 窗口長度:時間間隔, 滑動間隔: 時間間隔 - withWindow(Duration windowLength)
滑窗 窗口長度:時間間隔, 滑動間隔: 每個tuple進來都滑 - withWindow(Duration windowLength, Count slidingInterval)
滑窗 窗口長度:時間間隔, 滑動間隔: 時間間隔 - withTumblingWindow(BaseWindowedBolt.Count count)
滾窗 窗口長度:Tuple數 - withTumblingWindow(BaseWindowedBolt.Duration duration)
滾窗 窗口長度:時間間隔
Tuple時間戳和亂序
storm支持追蹤源數據的時間戳。
Event time 和Process time
默認的時間戳是處理元組時的bolt窗口生成的,
Event time,事件時間,通常這個時間會帶在Tuple中;
Process time,到某一個處理環節的時間。
舉例:A今天早上9點告訴B,說C昨天晚上9點在濱江國際;
這條信息中,可以認為C在濱江國際的Event time是昨天晚上9點,B接收到這條信息的時間,即Process time,是今天早上9點。
配置時間戳字段(timestamp field)
windows按照時間划分時,默認是Process time,也可以指定為Tuple中的Event time。
如果以Event time來划分窗口:
- Tuple落入到哪個窗口,是看tuple里的Event time。
- 窗口向后推進,主要依靠Event time的增長;
public BaseWindowedBolt withTimestampField(String fieldName)
延時(lag)和水位線(watermark)
從當前最后一條數據算起,往前減去lag,得到一個時間,這個時間就是watermark;
認為watermark之前的數據都已經到了。收到06:01:00的數據時,認為06:00:00的數據都到了。給他們入window。
這樣實際是一個延時處理,等到了06:01:00時,我才開始將06:00:00的數據放入窗口。
如果很不巧,06:00:00的數據在06:01:00之后,lag為60s,不好意思,進不了窗口。此數據不會被處理,並且會在worker的日志中加一行INFO信息。
public class SlidingWindowBolt extends BaseWindowedBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
for(Tuple tuple: inputWindow.get()) {
// do the windowing computation
...
}
// emit the results
collector.emit(new Values(computedValue));
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("slidingwindowbolt",
new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),
1).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}