CEP,Complex event processing
Wiki定義
“Complex event processing, or CEP, is event processing that combines data from multiple sources[2] to infer events or patterns that suggest more complicated circumstances. The goal of complex event processing is to identify meaningful events (such as opportunities or threats)[3] and respond to them as quickly as possible.”
通過上面的Wiki定義,可以看出CEP的特點主要是,
復雜性:多個流join,窗口聚合,事件序列或patterns檢測
低延遲:秒或毫秒級別,比如做信用卡盜刷檢測,或攻擊檢測
高吞吐:每秒上萬條消息
CEP和數據庫
CEP的概念出現比較早,用於解決傳統數據庫所無法解決的實時需求
傳統數據庫,數據是靜態的,查詢是動態的,但做不到實時和連續的輸出查詢結果
而CEP反其道而行之,查詢是靜態的,數據是動態的,這樣就可以滿足實現和連續查詢的需求,但是無法滿足ad hoc查詢需求
所以CEP和傳統數據庫相結合,可以用於解決金融,商業,網絡監控等領域的問題
比如比較知名的Esper,功能非常強大,並提供EPL這樣類sql語言,讓用戶感覺到類似使用數據庫的體驗
流計算下的CEP
流式計算概念可以認為是從Storm或Yahoo S4那個時候開始被大家廣泛接受的
流式計算概念的出現,主要針對當時主流的像Hadoop這樣的MapReduce系統在實時性上的缺陷;時勢造英雄,加上像Twitter這樣普及的實時應用,讓大家認識到數據實時性的重要性,從此實時大數據的時代漸漸來臨
CEP和流式計算是在不同的時代背景下產生的,而由於他們所要解決問題域的重合,注定了在技術上會產生融合;
在Storm的年代,Storm主要是封裝和提供一種類似MapReduce的編程模型,所以當時流式計算業務主要還是ETL和簡單聚合;
為了滿足CEP需求,可以將Esper引擎跑在Storm上,但是Esper雖然功能很強大,但是實在太重而且比較低效
后續出現輕量級的CEP引擎,如Siddhi,
但我們最終也沒有規模使用,最主要的原因是,它沒有考慮event time和數據亂序的問題,比較難於用於實際的線上場景
在Dataflow論文出來前,確實沒有任何計算平台,在平台層面對event time和數據亂序提出系統的方案,Flink實現了Dataflow中的窗口模型,在平台層面解決了event time和數據亂序的問題
並且Flink提供了專門的CEP的lib,FlinkCEP - Complex event processing for Flink
當然這個CEP lib是會考慮並解決event time和數據亂序問題的
下面我們先來看看Flink CEP是怎么使用的
Flink CEP
Example
我們先產生一個輸入流,這個輸入Event流由Event對象和event time組成
那么要使用EventTime,除了指定TimeCharacteristic外,在Flink中還要assignTimestampsAndWatermarks,其中分別定義了Eventtime和WaterMark,
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// (Event, timestamp)
DataStream<Event> input = env.fromElements(
Tuple2.of(new Event(1, "start", 1.0), 5L),
Tuple2.of(new Event(2, "middle", 2.0), 1L),
Tuple2.of(new Event(3, "end", 3.0), 3L),
Tuple2.of(new Event(4, "end", 4.0), 10L), //觸發2,3,1
Tuple2.of(new Event(5, "middle", 5.0), 7L),
// last element for high final watermark
Tuple2.of(new Event(5, "middle", 5.0), 100L) //觸發5,4
).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
@Override
public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
return element.f1; //定義Eventtime
}
@Override
public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
return new Watermark(lastElement.f1 - 5); //定義watermark
}
}).map(new MapFunction<Tuple2<Event, Long>, Event>() {
@Override
public Event map(Tuple2<Event, Long> value) throws Exception {
return value.f0;
}
});
接着我們定義需要匹配的pattern,需求就是找出包含”start“, ”middle“, ”end“的一組事件
具體語法參考Flink文檔,這里不詳述
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
}).followedByAny("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("middle");
}
}).followedByAny("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("end");
}
});
最終在輸入流上執行CEP,
這里實現PatternSelectFunction來處理匹配到的pattern,處理邏輯是打印出匹配到的3個Event對象的id
DataStream<String> result = CEP.pattern(input, pattern).select(
new PatternSelectFunction<Event, String>() {
@Override
public String select(Map<String, List<Event>> pattern) {
StringBuilder builder = new StringBuilder();
System.out.println(pattern);
builder.append(pattern.get("start").get(0).getId()).append(",")
.append(pattern.get("middle").get(0).getId()).append(",")
.append(pattern.get("end").get(0).getId());
return builder.toString();
}
}
);
result.print();
大家想想,這里匹配到的是哪些Event?
從上面Event的順序看應該是 1,2,3
但結果是 1,5,4,因為這里考慮的是Eventtime的順序,這個特性在生產環境中很關鍵,因為我們無法保證采集數據達到的順序。
Implementation
對於EventTime部分的實現,可以看下AbstractKeyedCEPPatternOperator中的實現,
public void processElement(StreamRecord<IN> element) throws Exception {
if (isProcessingTime) {
// there can be no out of order elements in processing time
NFA<IN> nfa = getNFA();
processEvent(nfa, element.getValue(), getProcessingTimeService().getCurrentProcessingTime());
updateNFA(nfa);
} else { //EventTime
long timestamp = element.getTimestamp();
IN value = element.getValue();
if (timestamp >= lastWatermark) { //只處理非late record
// we have an event with a valid timestamp, so
// we buffer it until we receive the proper watermark.
saveRegisterWatermarkTimer();
List<IN> elementsForTimestamp = elementQueueState.get(timestamp);
elementsForTimestamp.add(element.getValue());
elementQueueState.put(timestamp, elementsForTimestamp); //放到隊列中
}
}
}
如果是EventTime,不會直接processEvent並更新NFA,而是先放到一個隊列elementQueueState里面。
等后面收到watermark觸發onEventTime時,
會把隊列里面的數據按時間排序,從小到大,並把大於watermark的拿出來挨個處理,這樣就實現了按EventTime有序,解決了亂序問題。
Improvement
應用中實際使用Flink CEP時,發現有些不方便的地方:
首先,patterns需要用java代碼寫,需要編譯,很冗長很麻煩,沒法動態配置;需要可配置,或提供一種DSL
再者,對於一個流同時只能設置一個pattern,比如對於不同的用戶實例想配置不同的pattern,就沒法支持;需要支持按key設置pattern
DSL
對於第一個問題,我剛開始考慮開發一套DSL,這樣成本比較高,而且社區也在考慮支持SQL
所以我就先基於JSON簡單實現了一個,如下
這個基本可以滿足當前Flink CEP的常用語法,擴展也比簡單
通過一個JSONArray來表示一個pattern sequence,每個pattern中可以定義多個並,或條件
每個條件由三部分組成,比如,["sql", "contains", "delete"], "sql"是字段名,”contains“是Op,”delete“是value, 意思就是找出sql字段中包含delete的log
現在就不需要用java來寫pattern了,直接傳入配置就ok,如下,
JSONArray jsonArray = JSON.parseArray("pattern配置");
CepBuilder<Log> cepBuilder = new CepBuilder<Log>();
Pattern<Log, ?> pattern = cepBuilder.patternSequenceBuilder(jsonArray);
這里我實現一個CepBuilder可以把JSON配置直接轉換成Pattern對象
按Key配置多patterns
為了滿足為不同的用戶配置不同的pattern的需求,我修改了下Flink CEP提供的接口,
原先Flink CEP,是這樣定義CEP的,
PatternStream = CEP.pattern(input, pattern)
可以看到對一個input只能定義一個pattern,
所以我定義GroupPatternStream,可以傳入一組patterns
public class GroupPatternStream<K, T> {
// underlying data stream
private final DataStream<T> inputStream;
private final Map<K, Pattern<T, ?>> patterns;
GroupPatternStream(final DataStream<T> inputStream, final Map<K, Pattern<T, ?>> patterns) {
this.inputStream = inputStream;
this.patterns = patterns;
}
然后在createPatternStream邏輯中,把每個pattern compile成相應的NFAFactory,最終將nfaFactoryMap作為參數創建KeyedCEPGroupPatternOperator
public SingleOutputStreamOperator<Map<String, List<T>>> createPatternStream(DataStream<T> inputStream, Map<K, Pattern<T, ?>> patterns) {
final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
Map<K, NFACompiler.NFAFactory<T>> nfaFactoryMap = new HashMap<>();
if(patterns != null){
for(K key: patterns.keySet()){
Pattern<T, ?> pattern = patterns.get(key);
nfaFactoryMap.put(key, NFACompiler.compileFactory(pattern, inputSerializer, false));
}
}
if (inputStream instanceof KeyedStream) {
patternStream = keyedStream.transform(
"KeyedCEPPatternOperator",
(TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
new KeyedCEPGroupPatternOperator<>(
inputSerializer,
isProcessingTime,
keySerializer,
nfaFactory,
nfaFactoryMap,
true));
} else {
//not-support non-keyed stream
patternStream = null;
}
return patternStream;
}
KeyedCEPGroupPatternOperator,也是我新建的,和原來的KeyedCEPPatternOperator比多了個參數nfaFactoryMap,並且重寫了getNFA函數
public class KeyedCEPGroupPatternOperator<IN, KEY> extends KeyedCEPPatternOperator {
Map<KEY, NFACompiler.NFAFactory<IN>> nfaFactoryMap;
public KeyedCEPGroupPatternOperator( TypeSerializer<IN> inputSerializer,
boolean isProcessingTime,
TypeSerializer<KEY> keySerializer,
NFACompiler.NFAFactory<IN> nfaFactory,
Map<KEY, NFACompiler.NFAFactory<IN>> nfaFactoryMap,
boolean migratingFromOldKeyedOperator){
super(inputSerializer, isProcessingTime, keySerializer, nfaFactory,
migratingFromOldKeyedOperator);
this.nfaFactoryMap = nfaFactoryMap;
}
@Override
public NFA<IN> getNFA() throws IOException {
NFA<IN> nfa = (NFA<IN>) nfaOperatorState.value();
if(nfa == null) {
Object key = getCurrentKey();
NFACompiler.NFAFactory<IN> factory = nfaFactoryMap.get(key);
if(factory != null){
nfa = factory.createNFA();
}
//if the key didn't define pattern, add EmptyNFA
if(nfa == null){
nfa = new EmptyNFA<>();
}
}
return nfa;
}
}
核心邏輯就在getNFA, 主要就是通過修改這個邏輯來滿足需求
在KeyedCEPPatternOperator中,他每次都會生成同樣的NFA
public NFA<IN> getNFA() throws IOException {
NFA<IN> nfa = nfaOperatorState.value();
return nfa != null ? nfa : nfaFactory.createNFA();
}
而在我的邏輯里面,
會先取出當前上下文的key,
並根據不同的key,創建不同的NFA,這樣就可以實現對不同的key使用不同的pattern進行匹配。這些NFA狀態機是作為key的state存在stateBackend里面的,所以每次有相應的key的record流過時,都可以從stateBackend中取到。
然后我們就可以這樣用,
先准備測試數據,
Log log = new Log();
log.putItem("id", "1");
log.putItem("sql", "start counting!");
logs.add(log);
log = new Log();
log.putItem("id", "2");
log.putItem("sql", "start counting!");
logs.add(log);
log = new Log();
log.putItem("id", "1");
log.putItem("sql", "end counting");
logs.add(log);
log = new Log();
log.putItem("id", "2");
log.putItem("sql", "select from 1");
logs.add(log);
log = new Log();
log.putItem("id", "2");
log.putItem("sql", "end counting");
logs.add(log);
DataStream<Log> input = env.fromCollection(logs).keyBy(new KeySelector<Log, String>() {
public String getKey(Log log){
return (String)log.getItem("id");
}
});
構造pattern,
JSONArray jsonArray = JSON.parseArray(
"[{"id":"start","conditions":[[["sql","contains","start"]]]},{"id":"middle","conditions":[[["sql","contains","end"]]]}]");
JSONArray jsonArray2 = JSON.parseArray(
"[{"id":"start","conditions":[[["sql","contains","start"]]]},{"id":"middle","conditions":[[["sql","contains","select"]]]},{"id":"end","conditions":[[["sql","contains","end"]]]}]");
CepBuilder<Log> cepBuilder = new CepBuilder<Log>();
Pattern<Log, ?> pattern = cepBuilder.patternSequenceBuilder(jsonArray);
Pattern<Log, ?> pattern2 = cepBuilder.patternSequenceBuilder(jsonArray2);
Map<String, Pattern<Log, ?>> patternMap = new HashedMap();
patternMap.put("1", pattern);
patternMap.put("2", pattern2);
對於id=”1“的log,找出包含”start“,”end“的pattern
對於id=”2“的log,找出包含”start“,”select“,”end“的pattern
運行CEP,
GroupPatternStream<String, Log> groupPatternStream = new GroupPatternStream<>(input, patternMap);
DataStream<String> result =groupPatternStream.select(
new PatternSelectFunction<Log, String>() {
return pattern.toString();
}
});
result.print();
得到運行結果,
2> {middle=[{id=2, sql=select from 1}], start=[{id=2, sql=start counting!}], end=[{id=2, sql=end counting}]}
4> {middle=[{id=1, sql=end counting}], start=[{id=1, sql=start counting!}]}
可以看到對於不同的key,匹配到了不同的pattern,是不是很酷