Flink CEP簡單使用


環境准備

使用Flink CEP組件之前需要將FlinkCEP的依賴庫引入到項目中。本文基於1.9.1開發。
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.11</artifactId>
    <version>1.9.1</version>
</dependency>

基本概念

事件定義

  • 簡單事件

    處理單一事件,事件的定義可以直接觀察出來,處理過程無需關注多個事件之間的關系,能夠通過簡單的數據處理手段將結果計算出來。

  • 復雜事件

    相對於簡單事件,復雜事件處理的不僅是單一的事件,也處理由多個事件組成的復合事件。復雜事件處理監測分析事件流(Event Streaming),當特點事件發生時來觸發某些動作。

事件關系

復雜事件中事件於事件之間包含多種類型關系,常見的有時序關系、聚合關系、依賴關系及因果關系等。
  • 時序關系

    動作事件和動作事件之間,動作事件和狀態變化事件之間,都存在時間順序。事件和事件的時序關系決定了大部分的時序規則,例如A事件狀態持續為1的同時B事件狀態變為0等。

  • 聚合關系

    動作事件和動作事件之間,狀態事件和狀態事件之間都存在聚合關系,即個體聚合形成整體集合。例如A事件狀態為1的次數為10觸發預警。

  • 層次關系

    分類事件和子類事件的層次關系,從父類到子類是具體化的,從子類到父類是泛化的。

  • 依賴關系

    事務的狀態屬性之間存在彼此的依賴關系和約束關系。例如A事件狀態觸發的前提條件是B事件觸發,則A與B事件之間就形成了依賴關系。

  • 因果關系

    對於完整的動作過程,結果狀態為果,初始狀態和動作都可以視為因。例如A事件狀態的改變導致了B事件的觸發,則A事件就是因,而B事件就是果。

API用法

API 含義 示例 含義
where() 指定匹配條件 pattern.where(_ = 1) 匹配為1的數據
or() 匹配條件,或者關系 pattern.or(_ = 2) 匹配為2的數據
times() 模式發生的次數 pattern.times(2,4) 模式發生2-4次
oneOrMore() 模式發生的次數 pattern.oneOrMore() 發生1次或多次
timesOrMore() 模式發生的次數 pattern.timesOrMore(2) 發生2次或多次
optional() 要么觸發要么不觸發 pattern.times(2).optional() 發生0次或2次
greedy() 貪心匹配 pattern.times(2,4).greedy() 觸發2、3、4次,盡可能重復執行(應用在多個Pattern中)
until() 停止條件 pattern.oneOrMore().until(_ = 0) 遇都0結束
subtype() 定義子類型條件 pattern.subtype(Event.class) 只與Event事件匹配
within() 事件限制 pattern.within(Time.seconds(5)) 匹配在5s內發生
begin() 定義規則開始 Pattern. begin("start") 定義規則開始,事件類型為Event
next() 嚴格鄰近 start.next("next").where(_=1) 嚴格匹配下一個必需是1
followedBy() 寬松近鄰 start.followdBy("middle").where() 會忽略沒有 成功匹配的模式條件
followedByAny() 非確定寬松近鄰 start.followdByAny("middle").where() 可以忽略已經匹配的條件
notNext() 不讓某個事件嚴格緊鄰前一個事件發生 start.notNext("not").where(_=1) 下一個不能為1
notFollowedBy() 不讓某個事件在兩個事件之間發生 start.notFollowedBy("not").where()... 不希望某個事件在某兩個事件之間
consecutive() 嚴格匹配 start.where(_=1).times(3).consecutive() 必需連續三個1才能匹配成功
allowCombinations() 不確定連續 start.where(_=1).times(2).allowCombinations() 只要滿足兩個1就可以匹配成功

案例

使用where()/or()匹配

public class SimpleConditionsTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamEnv.setParallelism(1);

        //輸入數據源
        DataStream<Event> input = streamEnv.fromCollection(Arrays.asList(
                new Event(1L, "a1", "add", 1588298400L),
                new Event(2L, "c1", "add", 1588298400L),
                new Event(3L, "a2", "add", 1588298400L),
                new Event(4L, "b1", "add", 1588298400L),
                new Event(5L, "b1", "add", 1588298400L),
                new Event(6L, "a3", "add", 1588298400L)
        ));

        //1、定義規則  匹配以a和c開頭的用戶
        Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(
                new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {
                        return event.getName().startsWith("a");
                    }
                }
        ).or(
                        new SimpleCondition<Event>() {
                            @Override
                            public boolean filter(Event event) throws Exception {
                                return event.getName().startsWith("c");
                            }
                        }
                );

        //2、模式檢測
        PatternStream<Event> patternStream = CEP.pattern(input, pattern);

        patternStream.select(
            new PatternSelectFunction<Event, String>() {
                //返回匹配數據的id
                @Override
                public String select(Map<String, List<Event>> map) throws Exception {
                    StringBuffer sb = new StringBuffer();
                    for (Map.Entry<String, List<Event>> entry : map.entrySet()) {
                        Iterator<Event> iterator = entry.getValue().iterator();
                        iterator.forEachRemaining(i -> sb.append(i.getId()).append(","));
                    }
                    sb.deleteCharAt(sb.length() - 1);
                    return sb.toString();
                }
            }
        ).print();

        streamEnv.execute("simpleCEPTest");

    }

}
輸出結果
1
2
3
6
由結果可以看出數據是逐條匹配的。

使用量詞匹配times()

輸入數據:

//輸入數據源
DataStream<Event> input = streamEnv.fromCollection(Arrays.asList(
        new Event(1L, "a1", "add", 1588298400L),
        new Event(2L, "c1", "add", 1588298400L),
        new Event(3L, "a2", "add", 1588298400L),
        new Event(4L, "b1", "add", 1588298400L),
        new Event(5L, "b1", "add", 1588298400L),
        new Event(6L, "a3", "add", 1588298400L)
));

Pattern規則:

Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) throws Exception {
                return event.getName().startsWith("a");
            }
        }
).times(2);
輸出結果:
1,3
3,6
匹配到兩條以a開頭的數據后輸出(嚴格按照數據輸入的順序往后順延)。先讀到id = 1的記錄,滿足where條件;id = 2不滿足則忽略該條數據;id = 3滿足條件,
且此時已經讀到過id = 1的記錄,剛好兩條記錄,匹配成功輸出。4、5不滿足where條件,忽略;6滿足,3、6組合為一組事件,滿足規則。

Pattern規則:

Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) throws Exception {
                return event.getName().startsWith("a");
            }
        }
).times(1,3);
輸出結果:
1
1,3
3
1,3,6
3,6
6
匹配1、2、3次.輸入id = 1,匹配成功;然后輸入id = 3,此時匹配1、3(出現2次),再輸出3(3匹配一次),以此類推。匹配的順序與輸入數據的順序一致。
同理 oneOrMore()和timesOrMore()一樣。

使用嚴格匹配consecutive()

輸入數據:

DataStream<Event> input = streamEnv.fromCollection(Arrays.asList(
        new Event(1L, "a1", "add", 1588298400L),
        new Event(2L, "c1", "add", 1588298400L),
        new Event(3L, "a2", "add", 1588298400L),
        new Event(4L, "b1", "add", 1588298400L),
        new Event(5L, "b1", "add", 1588298400L),
        new Event(6L, "a3", "add", 1588298400L)
));

Pattern規則:

Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) throws Exception {
                return event.getName().startsWith("a");
            }
        }
).times(2).consecutive();
輸出結果:
此時輸出結果為空,沒有滿足規則的數據。

更改輸入數據源(id = 2的記錄改為a1):

DataStream<Event> input = streamEnv.fromCollection(Arrays.asList(
        new Event(1L, "a1", "add", 1588298400L),
        new Event(2L, "a1", "add", 1588298400L),
        new Event(3L, "a2", "add", 1588298400L),
        new Event(4L, "b1", "add", 1588298400L),
        new Event(5L, "b1", "add", 1588298400L),
        new Event(6L, "a3", "add", 1588298400L)
));
輸出結果:
1,2
2,3
此時匹配到兩組數據[1,2]和[2,3],由此可以看出使用嚴格匹配consecutive()后,事件必須是緊鄰的才滿足,如果中間有不滿足條件的事件則忽略,與next()類似。

使用不確定連續匹配allowCombinations()

輸入數據:

//輸入數據源
DataStream<Event> input = streamEnv.fromCollection(Arrays.asList(
        new Event(1L, "a1", "add", 1588298400L),
        new Event(2L, "c1", "add", 1588298400L),
        new Event(3L, "a2", "add", 1588298400L),
        new Event(4L, "b1", "add", 1588298400L),
        new Event(5L, "b1", "add", 1588298400L),
        new Event(6L, "a3", "add", 1588298400L)
));

Pattern規則:

Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) throws Exception {
                return event.getName().startsWith("a");
            }
        }
).times(2).allowCombinations();
輸出結果:
1,3
1,6
3,6
由結果可以看出,只要輸入的數據流中有2條記錄滿足where條件,就會匹配成功。如果不使用allowCombinations(),不會輸出[1,6]結果。allowCombinations()匹配成功的元素仍然可以與后面的元素繼續匹配。

greedy()使用

greedy()只有在多個pattern中使用時才起作用。在單個Pattern中使用時與不加greedy()是一樣的。
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    streamEnv.setParallelism(1);

    //輸入數據源
    DataStream<Event> input = streamEnv.fromCollection(Arrays.asList(
            new Event(1L, "a1", "add", 1588298400L),
            new Event(2L, "a2", "add", 1588298400L),
            new Event(3L, "a12", "add", 1588298400L),
            new Event(4L, "b11", "add", 1588298400L),
            new Event(5L, "b12", "add", 1588298400L),
            new Event(6L, "a3", "add", 1588298400L)
    ));

    //1、定義規則  匹配以a和c開頭的用戶

    Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().startsWith("a");
                }
            }
    ).times(2,3).next("middle").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().length() == 3;
                }
            }).times(1,2);

    //2、模式檢測
    PatternStream<Event> patternStream = CEP.pattern(input, pattern);

    patternStream.select(
            new PatternSelectFunction<Event, String>() {
                //返回匹配數據的id
                @Override
                public String select(Map<String, List<Event>> map) throws Exception {
                    StringBuffer sb = new StringBuffer();
                    for (Map.Entry<String, List<Event>> entry : map.entrySet()) {
                        Iterator<Event> iterator = entry.getValue().iterator();
                        iterator.forEachRemaining(i -> sb.append(i.getName()).append(","));
                        sb.append("|").append(",");
                    }
                    sb.delete(sb.length() - 4 , sb.length() - 1);
                    return sb.toString();
                }
            }
    ).print();

    streamEnv.execute("simpleCEPTest");

}
匹配名稱以a開頭且下一個名稱長度為3的規則。如果不加greedy()輸出結果如下:
a1,a2,|,a12
a1,a2,a12,|,b11
a1,a2,|,a12,b11
a2,a12,|,b11
a1,a2,a12,|,b11,b12
a2,a12,|,b11,b12
其中分隔符|用來將匹配的第一個where和第二個where隔開,有結果可以看出a12這條記錄會在兩個where中都去匹配。
1、讀入a1,滿足start的判斷a開頭,暫存[a1];
2、讀入a2,滿足start的判斷a開頭,且上一個為[a1],組合[a1,a2]滿足整個start判斷(2-3個a開頭的),存入狀態[a1,a2];
3、讀入a12,滿足start的判斷a開頭,存入狀態[a1,a2,a12],[a2,a12];同時也滿足middle判斷,存入狀態[a12],此時整個條件都滿足,輸出結果[a1,a2,|,a12];
4、讀入b11,滿足middle判斷,存入狀態[a12,b11],[b11];此時整個條件都滿足,輸出[a1,a2,a12,|,b11],[a1,a2,|,a12,b11],[a2,a12,|,b11]
5、讀入b12,滿足middle判斷,存入狀態[b11,b12],[b12];此時整個條件滿足,輸出[a1,a2,a12,|,b11,b12],[a2,a12,|,b11,b12];由於[b12]是緊鄰[b11]的,所以這里不會跳過[b11]而單獨應用[b12],因此沒有單獨與[b12]匹配的結果;
6、讀入a3,滿足start判斷,但此時相當於又從頭開始匹配了,存入狀態[a3];

使用greedy()的Pattern:

Pattern.<Event>begin("start").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().startsWith("a");
                }
            }
    ).times(2,3).greedy().next("middle").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().length() == 3;
                }
}).times(1,2);
輸出結果:
a1,a2,a12,|,b11
a2,a12,|,b11
a1,a2,a12,|,b11,b12
a2,a12,|,b11,b12
由結果可以看出,相較於不加greedy(),匹配結果變少了,a12只與前一個where匹配,忽略第二個where。

Groups of patterns使用

  • next()使用

輸入數據:

//輸入數據源
DataStream<Event> input = streamEnv.fromCollection(Arrays.asList(
        new Event(1L, "a1", "add", 1588298400L),
        new Event(2L, "a2", "add", 1588298400L),
        new Event(3L, "c12", "add", 1588298400L),
        new Event(4L, "b11", "add", 1588298400L),
        new Event(5L, "b12", "add", 1588298400L),
        new Event(6L, "c3", "add", 1588298400L),
        new Event(7L, "c7", "add", 1588298400L)
));

Pattern:

 Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) throws Exception {
                return event.getName().startsWith("a");
            }
    }).times(1,2).next("middle").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().startsWith("c");
                }
            }
    ).times(1,2);
輸出結果:
a1,a2,c12
a2,c12
a1,a2,c12,c3
a2,c12,c3
next()嚴格緊鄰。
1、讀入a1,滿足start,存入狀態中[a1];
2、讀入a2,滿足start,存入狀態中[a2],[a1,a2];
3、讀入c1,滿足middle,存入狀態中間結果[c12],同時滿足整個條件,輸出結果集[a1,a2,c12]、[a2,c12];此時沒有輸出[a1,c12],這是由於next()是嚴格緊鄰的,不能跳過a2元素;
4、讀入b11,不滿足條件,忽略;
5、讀入b12,不滿足條件,忽略;
6、讀入c3,滿足middle,存入狀態中[c12,c3]、[c3],同時滿足整個條件,輸出結果[a1,a2,c12,c3]、[a2,c12,c3]。
7、讀入c7,此時也不會與前面匹配,跟c3匹配一樣。
  • followedBy()使用

輸入數據:

//輸入數據源
DataStream<Event> input = streamEnv.fromCollection(Arrays.asList(
        new Event(1L, "a1", "add", 1588298400L),
        new Event(2L, "a2", "add", 1588298400L),
        new Event(3L, "c12", "add", 1588298400L),
        new Event(4L, "b11", "add", 1588298400L),
        new Event(5L, "b12", "add", 1588298400L),
        new Event(6L, "c3", "add", 1588298400L),
        new Event(7L, "c7", "add", 1588298400L)
));

Pattern:

Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) throws Exception {
                return event.getName().startsWith("a");
            }
    }).times(1,2).followedBy("followedBy").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().startsWith("c");
                }
            }
    ).times(1,2);
輸出結果:
a1,a2,c12
a1,c12
a2,c12
a1,a2,c12,c3
a1,c12,c3
a2,c12,c3
匹配1-2個以a開頭的數據,同時包含1-2個以c開頭的數據。
1、讀入a1,滿足start,存入狀態中[a1];
2、讀入a2,滿足start,存入狀態中[a2],[a1,a2];
3、讀入c12,滿足followedBy,存入狀態中間結果[c12],同時滿足整個條件,輸出結果集[a1,a2,c12]、[a1,c12]、[a2,c12];
4、讀入b11,不滿足條件,忽略;
5、讀入b12,不滿足條件,忽略;
6、讀入c3,滿足follwoedBy,存入狀態中[c12,c3]、[c3],同時滿足整個條件,輸出結果[a1,a2,c12,c3]、[a1,c12,c3]、[a2,c12,ce]。此時不會單獨輸出與c3的匹配,由於c12是緊跟在a1,a2后面的,因此在模式匹配時不能忽略c12元素因此沒有與c3的單獨匹配;
7、讀入c7,此時也不會與前面匹配,跟c3匹配一樣。
與next()不一樣,followedBy()是寬松緊鄰,緩存的中間狀態a1(對比第三步)是可用的。
  • followedByAny()使用

輸入數據:

//輸入數據源
DataStream<Event> input = streamEnv.fromCollection(Arrays.asList(
        new Event(1L, "a1", "add", 1588298400L),
        new Event(2L, "a2", "add", 1588298400L),
        new Event(3L, "c12", "add", 1588298400L),
        new Event(4L, "b11", "add", 1588298400L),
        new Event(5L, "b12", "add", 1588298400L),
        new Event(6L, "c3", "add", 1588298400L),
        new Event(7L, "c7", "add", 1588298400L)
));

Pattern:

Pattern.<Event>begin("start").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().startsWith("a");
                }
            }
    ).times(1,2).followedByAny("followedByAny").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().startsWith("c");
                }
            }
).times(1,2);
輸出結果:
a1,a2,c12
a1,c12
a2,c12
a1,a2,c12,c3
a1,a2,c3
a1,c12,c3
a1,c3
a2,c12,c3
a2,c3
a1,a2,c3,c7
a1,a2,c7
a1,c3,c7
a1,c7
a2,c3,c7
a2,c7
1、讀入a1,滿足start條件,存入狀態[a1];
2、讀入a2,滿足start條件,存入狀態[a1,a2],[a2];
3、讀入c12,滿足followedByAny條件,存入狀態[c12];同時滿足整個條件,此時輸出結果[a1,a2,c12]、[a1,c12]、[a2,c12];
4、讀入b11,不滿足,忽略;
5、讀入b12,不滿足,忽略;
6、讀入c3,滿足followedByAny條件,存入狀態[c12,c3],[c3];同時滿足整個條件,此時輸出結果[a1,a2,c12,c3]、[a1,a2,c3]、[a1,c12,c3]、[a1,c3]、[a2,c12,c3]、[a2,c3];
7、讀入c7,滿足followedByAny條件,存入狀態[c3,c7],[c7];同時滿足整個條件,此時輸出結果[a1,a2,c3,c7]、[a1,a2,c7]、[a1,c3,c7]、[a1,c7]、[a2,c3,c7]、[a2,c7];
followedByAny()非確定寬松鄰近,由結果可以看出c3可以跳過c12與前面數據匹配(對比followedByAny),只要滿足a在c前面即可。

until()使用

輸入數據:

//輸入數據源
DataStream<Event> input = streamEnv.fromCollection(Arrays.asList(
        new Event(1L, "a1", "add", 1588298400L),
        new Event(2L, "a2", "add", 1588298400L),
        new Event(3L, "c12", "add", 1588298400L),
        new Event(4L, "b11", "add", 1588298400L),
        new Event(5L, "b12", "add", 1588298400L),
        new Event(6L, "c3", "add", 1588298400L),
        new Event(7L, "c7", "add", 1588298400L)
));

Pattern:

Pattern.<Event>begin("start").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().startsWith("a");
                }
            }
    ).oneOrMore().until(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().startsWith("b");
                }
            }
    );
輸出結果:
a1
a1,a2
a2
unitl()只能跟在oneOrMore()/timesOrMore()這種后面,定義結束條件,上面匹配一個或多個以a開頭的,直到b開頭的停止。
1、輸入a1,滿足start,存入狀態[a1];
2、輸入a2,滿足start,存入狀態[a1,a2],[a2];
3、輸入c12,不滿足start,忽略;
4、輸入b11,滿足until條件,停止檢測;此時輸出結果[a1]、[a1,a2]、[a2]
5、輸入b12,雖然滿足until條件,但是在b11時已經觸發了結果,此時不會再處理觸發,而是從start開始重新再匹配關系;
6、輸入c3,不滿足start,忽略;
7、輸入c7,不滿足,忽略。

notNext()使用

輸入數據:

DataStream<Event> input = streamEnv.fromCollection(Arrays.asList(
        new Event(1L, "a1", "add", 1588298400L),
        new Event(2L, "a2", "add", 1588298400L),
        new Event(3L, "c12", "add", 1588298400L),
        new Event(4L, "b11", "add", 1588298400L),
        new Event(5L, "b12", "add", 1588298400L),
        new Event(6L, "c3", "add", 1588298400L),
        new Event(7L, "c7", "add", 1588298400L)
));

Pattern:

 Pattern.<Event>begin("start").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().startsWith("a");
                }
            }
    ).times(1,2).notNext("notNext").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().startsWith("c");
                }
            }
    );
輸出結果:
a1
notNext(),不讓某個事件嚴格緊鄰前一個事件。匹配1-2個以a開頭的,但是后面不能跟以c開頭的。
1、讀入a1,滿足start,存入狀態[a1];
2、讀入a2,滿足start,存入狀態[a1,a2],[a2];
3、讀入c12,滿足notNext(),由於a后面不能跟c,所以此時只輸出a1,[a1,a2,c12]、[a2,c12]是滿足判斷條件的;
4、依次讀入后面數據,與上面判斷一樣。

notFollowedBy()使用

輸入數據:

//輸入數據源
DataStream<Event> input = streamEnv.fromCollection(Arrays.asList(
        new Event(1L, "a1", "add", 1588298400L),
        new Event(2L, "a2", "add", 1588298400L),
        new Event(3L, "c12", "add", 1588298400L),
        new Event(4L, "b11", "add", 1588298400L),
        new Event(5L, "b12", "add", 1588298400L),
        new Event(6L, "c3", "add", 1588298400L),
        new Event(7L, "c7", "add", 1588298400L)
));

Pattern:

 Pattern.<Event>begin("start").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().startsWith("a");
                }
            }
    ).times(1,2).notFollowedBy("notFollowed").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().startsWith("b");
                }
            }
    ).next("next").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) throws Exception {
                    return event.getName().startsWith("c");
                }
            }
    );
輸出結果:
a1,a2,c12
a2,c12
notFollowedBy(),不讓某個事件在兩個事件之間發生,模式序列不能以.notFollowedBy()結束。此處不能讓以b開頭的數據出現在以a和c開頭的數據之間。
1、讀入a1,滿足start,存入狀態[a1];
2、讀入a2,滿足start,存入狀態[a1,a2]、[a2];
3、讀入c12,滿足條件,存入狀態[a1,a2,c12]、[a2,c12];
4、讀入b11,暫不處理;
5、讀入b12,暫不處理;
6、讀入c3,此時整個匹配規則滿足,[a1,a2,c12,b11,b12,c3]、[a2,c12,b11,b12,c3],a和c中間出現了b的狀態;滿足條件表達式。
將結果輸出[a1,a2,c12]、[a2,c12]
7、讀入c7,此時讀入c7需要從頭開始判斷了;因為c3時已經觸發了。

winthin()使用

案例:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

    //定義事件時間
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    streamEnv.setParallelism(1);

    DataStream<LoginEvent> input = streamEnv.fromCollection(Arrays.asList(
            new LoginEvent(1, "張三", "fail", 1577080457L),
            new LoginEvent(2, "張三", "fail", 1577080458L),
            new LoginEvent(3, "張三", "fail", 1577080460L),
            new LoginEvent(4, "李四", "fail", 1577080458L),
            new LoginEvent(5, "李四", "success", 1577080462L),
            new LoginEvent(6, "張三", "fail", 1577080462L)
    ))
      //注冊watermark  亂序時間為0
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<LoginEvent>() {

        long maxEventTime = 0L;

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(maxEventTime);
        }

        @Override
        public long extractTimestamp(LoginEvent loginEvent, long l) {
            return loginEvent.getEventTime() * 1000;
        }
    });

    //1、定義規則
    //匹配一個惡意登錄的模式(如果一個用戶連續(在10秒內)失敗三次,則是惡意登錄)
    //每個規則都是以begin開始   把每個規則直接定義出來
    /*
        模式序列
        1、嚴格鄰近 next:所有事件都按照順序滿足模式條件,不允許忽略任意不滿足的模式。
        2、寬松鄰近 followedBy:會忽略沒有 成功匹配的模式條件
        3、非確定寬松鄰近 followedByAny:和寬松鄰近條件相比,非確定寬松鄰近條件指在 模式匹配過程中可以忽略已經匹配的條件
     */
    Pattern<LoginEvent, LoginEvent> nextPattern = Pattern.<LoginEvent>begin("start")
            //第一個fail
            .where(new SimpleCondition<LoginEvent>() {
                @Override
                public boolean filter(LoginEvent loginEvent) throws Exception {
                    return "fail".equals(loginEvent.getEventType());
                }
            }).next("fail2").where(new SimpleCondition<LoginEvent>() {
                //第二個fail
                @Override
                public boolean filter(LoginEvent loginEvent) throws Exception {
                    return "fail".equals(loginEvent.getEventType());
                }
            }).next("fail3").where(new SimpleCondition<LoginEvent>() {
                //第三個fail
                @Override
                public boolean filter(LoginEvent loginEvent) throws Exception {
                    return "fail".equals(loginEvent.getEventType());
                }
            })
            //時間限制  10秒內進行匹配,超過這個范圍則失效
            .within(Time.seconds(10));

    //2、模式檢測   需要按照用戶分組
    PatternStream<LoginEvent> patternStream = CEP.pattern(input.keyBy("userName"), nextPattern);

    /*
        3、選擇結果
        3.1  select function 抽取正常事件
        3.2  flat select function 抽取正常事件,可以返回任意數量的結果
        3.3  process function
     */
    SingleOutputStreamOperator<String> result = patternStream.select(new PatternSelectFunction<LoginEvent, String>() {
        /**
         * map中的key為模式序列中pattern的名稱,value為對應的pattern所接受的事件集合
         *
         * @param map
         * @return
         * @throws Exception
         */
        @Override
        public String select(Map<String, List<LoginEvent>> map) throws Exception {
            StringBuffer sb = new StringBuffer();
            String userName = null;
            for (Map.Entry<String, List<LoginEvent>> entry : map.entrySet()) {
                String patternName = entry.getKey();
                List<LoginEvent> patternValue = entry.getValue();
                System.out.println(patternName + ":" + patternValue.toString());
                if (userName == null) {
                    userName = patternValue.get(0).getUserName();
                }
                sb.append(patternValue.get(0).getEventTime()).append(",");
            }
            return userName + " -> " + sb.toString();
        }
    });

    //打印匹配結果
    result.print("result:");

    streamEnv.execute("cepTest");

}
輸出結果:
start:[LoginEvent{id=1, userName='張三', eventType='fail', eventTime=2019-12-23 13:54:17.0}]
fail2:[LoginEvent{id=2, userName='張三', eventType='fail', eventTime=2019-12-23 13:54:18.0}]
fail3:[LoginEvent{id=3, userName='張三', eventType='fail', eventTime=2019-12-23 13:54:20.0}]
result:> 張三 -> 1577080457,1577080458,1577080460,
start:[LoginEvent{id=2, userName='張三', eventType='fail', eventTime=2019-12-23 13:54:18.0}]
fail2:[LoginEvent{id=3, userName='張三', eventType='fail', eventTime=2019-12-23 13:54:20.0}]
fail3:[LoginEvent{id=6, userName='張三', eventType='fail', eventTime=2019-12-23 13:54:22.0}]
result:> 張三 -> 1577080458,1577080460,1577080462,
按name分組匹配10秒內連續三次fail的數據(10秒鍾內匹配有效)。張三在1577080457-1577080462期間總fail4次,因此輸出兩個結果。

更新數據,張三最后一次登錄時間:

 new LoginEvent(6, "張三", "fail", 1577080468L)
輸出結果:
start:[LoginEvent{id=1, userName='張三', eventType='fail', eventTime=2019-12-23 13:54:17.0}]
fail2:[LoginEvent{id=2, userName='張三', eventType='fail', eventTime=2019-12-23 13:54:18.0}]
fail3:[LoginEvent{id=3, userName='張三', eventType='fail', eventTime=2019-12-23 13:54:20.0}]
result:> 張三 -> 1577080457,1577080458,1577080460,
由於第2條數據和第6條數據相差10s,所以導致后面一次匹配無效,必須在10秒內完成。

注意

1、所有模式序列必須以.begin()開始;
2、模式序列不能以.notFollwedBy()結束;
3、"not"類型的模式不能被optional所修飾;

總結

本文只針對Flink CEP的各個模式及API進行初步使用,個人理解可能也存在偏差。文中如有表述不當地方歡迎指正,大家相互學習。

參考文章 https://juejin.im/post/5de1f32af265da05cc3190f9


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM