流計算技術實戰 - CEP


CEP,Complex event processing

Wiki定義

“Complex event processing, or CEP, is event processing that combines data from multiple sources[2] to infer events or patterns that suggest more complicated circumstances. The goal of complex event processing is to identify meaningful events (such as opportunities or threats)[3] and respond to them as quickly as possible.”

通過上面的Wiki定義,可以看出CEP的特點主要是,
復雜性:多個流join,窗口聚合,事件序列或patterns檢測
低延遲:秒或毫秒級別,比如做信用卡盜刷檢測,或攻擊檢測
高吞吐:每秒上萬條消息

CEP和數據庫

CEP的概念出現比較早,用於解決傳統數據庫所無法解決的實時需求
傳統數據庫,數據是靜態的,查詢是動態的,但做不到實時和連續的輸出查詢結果
而CEP反其道而行之,查詢是靜態的,數據是動態的,這樣就可以滿足實現和連續查詢的需求,但是無法滿足ad hoc查詢需求
所以CEP和傳統數據庫相結合,可以用於解決金融,商業,網絡監控等領域的問題
比如比較知名的Esper,功能非常強大,並提供EPL這樣類sql語言,讓用戶感覺到類似使用數據庫的體驗

流計算下的CEP

流式計算概念可以認為是從Storm或Yahoo S4那個時候開始被大家廣泛接受的
流式計算概念的出現,主要針對當時主流的像Hadoop這樣的MapReduce系統在實時性上的缺陷;時勢造英雄,加上像Twitter這樣普及的實時應用,讓大家認識到數據實時性的重要性,從此實時大數據的時代漸漸來臨

CEP和流式計算是在不同的時代背景下產生的,而由於他們所要解決問題域的重合,注定了在技術上會產生融合;
在Storm的年代,Storm主要是封裝和提供一種類似MapReduce的編程模型,所以當時流式計算業務主要還是ETL和簡單聚合;
為了滿足CEP需求,可以將Esper引擎跑在Storm上,但是Esper雖然功能很強大,但是實在太重而且比較低效

后續出現輕量級的CEP引擎,如Siddhi,
但我們最終也沒有規模使用,最主要的原因是,它沒有考慮event time和數據亂序的問題,比較難於用於實際的線上場景

在Dataflow論文出來前,確實沒有任何計算平台,在平台層面對event time和數據亂序提出系統的方案,Flink實現了Dataflow中的窗口模型,在平台層面解決了event time和數據亂序的問題
並且Flink提供了專門的CEP的lib,FlinkCEP - Complex event processing for Flink
當然這個CEP lib是會考慮並解決event time和數據亂序問題的
下面我們先來看看Flink CEP是怎么使用的

Flink CEP

Example

我們先產生一個輸入流,這個輸入Event流由Event對象和event time組成
那么要使用EventTime,除了指定TimeCharacteristic外,在Flink中還要assignTimestampsAndWatermarks,其中分別定義了Eventtime和WaterMark,

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// (Event, timestamp)
DataStream<Event> input = env.fromElements(
    Tuple2.of(new Event(1, "start", 1.0), 5L),
    Tuple2.of(new Event(2, "middle", 2.0), 1L),
    Tuple2.of(new Event(3, "end", 3.0), 3L),
    Tuple2.of(new Event(4, "end", 4.0), 10L), //觸發2,3,1
    Tuple2.of(new Event(5, "middle", 5.0), 7L),
    // last element for high final watermark
    Tuple2.of(new Event(5, "middle", 5.0), 100L) //觸發5,4
).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {

    @Override
    public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
        return element.f1; //定義Eventtime
    }

    @Override
    public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
        return new Watermark(lastElement.f1 - 5); //定義watermark
    }

}).map(new MapFunction<Tuple2<Event, Long>, Event>() {
    @Override
    public Event map(Tuple2<Event, Long> value) throws Exception {
        return value.f0;
    }
});
接着我們定義需要匹配的pattern,需求就是找出包含”start“, ”middle“, ”end“的一組事件
具體語法參考Flink文檔,這里不詳述

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {

    @Override
    public boolean filter(Event value) throws Exception {
        return value.getName().equals("start");
    }
}).followedByAny("middle").where(new SimpleCondition<Event>() {

    @Override
    public boolean filter(Event value) throws Exception {
        return value.getName().equals("middle");
    }
}).followedByAny("end").where(new SimpleCondition<Event>() {

    @Override
    public boolean filter(Event value) throws Exception {
        return value.getName().equals("end");
    }
});
最終在輸入流上執行CEP,
這里實現PatternSelectFunction來處理匹配到的pattern,處理邏輯是打印出匹配到的3個Event對象的id

DataStream<String> result = CEP.pattern(input, pattern).select(
    new PatternSelectFunction<Event, String>() {

        @Override
        public String select(Map<String, List<Event>> pattern) {
            StringBuilder builder = new StringBuilder();
            System.out.println(pattern);
            builder.append(pattern.get("start").get(0).getId()).append(",")
                .append(pattern.get("middle").get(0).getId()).append(",")
                .append(pattern.get("end").get(0).getId());

            return builder.toString();
        }
    }
);

result.print();
大家想想,這里匹配到的是哪些Event?
從上面Event的順序看應該是 1,2,3

但結果是 1,5,4,因為這里考慮的是Eventtime的順序,這個特性在生產環境中很關鍵,因為我們無法保證采集數據達到的順序。

Implementation

對於EventTime部分的實現,可以看下AbstractKeyedCEPPatternOperator中的實現,

    public void processElement(StreamRecord<IN> element) throws Exception {
        if (isProcessingTime) {
            // there can be no out of order elements in processing time
            NFA<IN> nfa = getNFA();
            processEvent(nfa, element.getValue(), getProcessingTimeService().getCurrentProcessingTime());
            updateNFA(nfa);

        } else { //EventTime
            long timestamp = element.getTimestamp();
            IN value = element.getValue();

            if (timestamp >= lastWatermark) { //只處理非late record

                // we have an event with a valid timestamp, so
                // we buffer it until we receive the proper watermark.

                saveRegisterWatermarkTimer();

                List<IN> elementsForTimestamp =  elementQueueState.get(timestamp);
                elementsForTimestamp.add(element.getValue());
                elementQueueState.put(timestamp, elementsForTimestamp); //放到隊列中
            }
        }
    }
如果是EventTime,不會直接processEvent並更新NFA,而是先放到一個隊列elementQueueState里面。
等后面收到watermark觸發onEventTime時,
會把隊列里面的數據按時間排序,從小到大,並把大於watermark的拿出來挨個處理,這樣就實現了按EventTime有序,解決了亂序問題。

Improvement

應用中實際使用Flink CEP時,發現有些不方便的地方:

首先,patterns需要用java代碼寫,需要編譯,很冗長很麻煩,沒法動態配置;需要可配置,或提供一種DSL
再者,對於一個流同時只能設置一個pattern,比如對於不同的用戶實例想配置不同的pattern,就沒法支持;需要支持按key設置pattern

DSL

對於第一個問題,我剛開始考慮開發一套DSL,這樣成本比較高,而且社區也在考慮支持SQL
所以我就先基於JSON簡單實現了一個,如下

image

 

這個基本可以滿足當前Flink CEP的常用語法,擴展也比簡單
通過一個JSONArray來表示一個pattern sequence,每個pattern中可以定義多個並,或條件
每個條件由三部分組成,比如,["sql", "contains", "delete"], "sql"是字段名,”contains“是Op,”delete“是value, 意思就是找出sql字段中包含delete的log

現在就不需要用java來寫pattern了,直接傳入配置就ok,如下,

JSONArray jsonArray = JSON.parseArray("pattern配置");

CepBuilder<Log> cepBuilder = new CepBuilder<Log>();
Pattern<Log, ?>  pattern = cepBuilder.patternSequenceBuilder(jsonArray);
這里我實現一個CepBuilder可以把JSON配置直接轉換成Pattern對象

按Key配置多patterns

為了滿足為不同的用戶配置不同的pattern的需求,我修改了下Flink CEP提供的接口,
原先Flink CEP,是這樣定義CEP的,
PatternStream = CEP.pattern(input, pattern)
可以看到對一個input只能定義一個pattern,

所以我定義GroupPatternStream,可以傳入一組patterns

public class GroupPatternStream<K, T> {

    // underlying data stream
    private final DataStream<T> inputStream;

    private final  Map<K, Pattern<T, ?>> patterns;

    GroupPatternStream(final DataStream<T> inputStream, final Map<K, Pattern<T, ?>> patterns) {
        this.inputStream = inputStream;
        this.patterns = patterns;
    }
然后在createPatternStream邏輯中,把每個pattern compile成相應的NFAFactory,最終將nfaFactoryMap作為參數創建KeyedCEPGroupPatternOperator

public SingleOutputStreamOperator<Map<String, List<T>>> createPatternStream(DataStream<T> inputStream, Map<K, Pattern<T, ?>> patterns) {
    final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
    Map<K,  NFACompiler.NFAFactory<T>> nfaFactoryMap = new HashMap<>();

    if(patterns != null){
        for(K key: patterns.keySet()){
            Pattern<T, ?> pattern = patterns.get(key);
            nfaFactoryMap.put(key, NFACompiler.compileFactory(pattern, inputSerializer, false));
        }
    }

    if (inputStream instanceof KeyedStream) {
        patternStream = keyedStream.transform(
            "KeyedCEPPatternOperator",
            (TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
            new KeyedCEPGroupPatternOperator<>(
                inputSerializer,
                isProcessingTime,
                keySerializer,
                nfaFactory,
                nfaFactoryMap,
                true));
    } else {
        //not-support non-keyed stream
        patternStream = null;
    }

    return patternStream;
}
KeyedCEPGroupPatternOperator,也是我新建的,和原來的KeyedCEPPatternOperator比多了個參數nfaFactoryMap,並且重寫了getNFA函數

public class KeyedCEPGroupPatternOperator<IN, KEY> extends KeyedCEPPatternOperator {

    Map<KEY,  NFACompiler.NFAFactory<IN>> nfaFactoryMap;

    public KeyedCEPGroupPatternOperator(   TypeSerializer<IN> inputSerializer,
        boolean isProcessingTime,
        TypeSerializer<KEY> keySerializer,
        NFACompiler.NFAFactory<IN> nfaFactory,
        Map<KEY,  NFACompiler.NFAFactory<IN>> nfaFactoryMap,
        boolean migratingFromOldKeyedOperator){
        super(inputSerializer, isProcessingTime, keySerializer, nfaFactory,
            migratingFromOldKeyedOperator);

        this.nfaFactoryMap = nfaFactoryMap;
    }

    @Override
    public NFA<IN> getNFA() throws IOException {
        NFA<IN> nfa = (NFA<IN>) nfaOperatorState.value();
        if(nfa == null) {
            Object key = getCurrentKey();
            NFACompiler.NFAFactory<IN> factory =  nfaFactoryMap.get(key);
            if(factory != null){
                nfa = factory.createNFA();
            }

            //if the key didn't define pattern, add EmptyNFA
            if(nfa == null){
                nfa = new EmptyNFA<>();
            }
        }
        return nfa;
    }

}
核心邏輯就在getNFA, 主要就是通過修改這個邏輯來滿足需求
在KeyedCEPPatternOperator中,他每次都會生成同樣的NFA

public NFA<IN> getNFA() throws IOException {
    NFA<IN> nfa = nfaOperatorState.value();
    return nfa != null ? nfa : nfaFactory.createNFA();
}
而在我的邏輯里面,
會先取出當前上下文的key,
並根據不同的key,創建不同的NFA,這樣就可以實現對不同的key使用不同的pattern進行匹配。這些NFA狀態機是作為key的state存在stateBackend里面的,所以每次有相應的key的record流過時,都可以從stateBackend中取到。

然后我們就可以這樣用,
先准備測試數據,

Log log = new Log();
log.putItem("id", "1");
log.putItem("sql", "start counting!");
logs.add(log);

log = new Log();
log.putItem("id", "2");
log.putItem("sql", "start counting!");
logs.add(log);

log = new Log();
log.putItem("id", "1");
log.putItem("sql", "end counting");
logs.add(log);

log = new Log();
log.putItem("id", "2");
log.putItem("sql", "select from 1");
logs.add(log);

log = new Log();
log.putItem("id", "2");
log.putItem("sql", "end counting");
logs.add(log);
DataStream<Log> input = env.fromCollection(logs).keyBy(new KeySelector<Log, String>() {
    public String getKey(Log log){
        return (String)log.getItem("id");
    }
});

構造pattern,

JSONArray jsonArray = JSON.parseArray(
    "[{"id":"start","conditions":[[["sql","contains","start"]]]},{"id":"middle","conditions":[[["sql","contains","end"]]]}]");

JSONArray jsonArray2 = JSON.parseArray(
    "[{"id":"start","conditions":[[["sql","contains","start"]]]},{"id":"middle","conditions":[[["sql","contains","select"]]]},{"id":"end","conditions":[[["sql","contains","end"]]]}]");

CepBuilder<Log> cepBuilder = new CepBuilder<Log>();
Pattern<Log, ?> pattern = cepBuilder.patternSequenceBuilder(jsonArray);
Pattern<Log, ?>  pattern2 = cepBuilder.patternSequenceBuilder(jsonArray2);

Map<String,  Pattern<Log, ?>> patternMap = new HashedMap();
patternMap.put("1", pattern);
patternMap.put("2", pattern2);
對於id=”1“的log,找出包含”start“,”end“的pattern
對於id=”2“的log,找出包含”start“,”select“,”end“的pattern

運行CEP,

    GroupPatternStream<String, Log> groupPatternStream = new GroupPatternStream<>(input, patternMap);
        DataStream<String> result =groupPatternStream.select(
            new PatternSelectFunction<Log, String>() {
                    return pattern.toString();
                }
            });
        result.print();
得到運行結果,
2> {middle=[{id=2, sql=select from 1}], start=[{id=2, sql=start counting!}], end=[{id=2, sql=end counting}]}
4> {middle=[{id=1, sql=end counting}], start=[{id=1, sql=start counting!}]}
可以看到對於不同的key,匹配到了不同的pattern,是不是很酷


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM