Flink CEP實例及基礎應用


CEP(Complex Event Processing)就是在無界事件流中檢測事件模式,使能夠掌握數據中重要的部分。

1>.輸入數據流的創建
2>.模式(Pattern)定義
3>.Pattern應用在事件流上的檢測
4>.選取結果

3.常用的個體連續連續模式:

嚴格連續模式,松散連續,不確定的松散連續。當然還有嚴格連續的NOT模式和松散連續的NOT模式,這兩種並不常用,下面代碼舉例說明常用的三種模式
flink CEP編程需要導入的lib包

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-cep_2.11</artifactId>
      <version>${flink.version}</version>
</dependency>
package org.stsffap.cep.monitoring;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MyCEPTest {
    public static void main(String args[]) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> dataStream = env.fromElements(("a"), ("c"), ("b1"), ("b2"));

        /*---------嚴格連續模式----------------------*/
        Pattern strictPattern = Pattern.begin("start").where(new IterativeCondition<Object>() {
            @Override
            public boolean filter(Object s, Context<Object> context) {
                return s.toString().equalsIgnoreCase("a");
            }
        }).next("middle").where(new IterativeCondition<Object>() {
            @Override
            public boolean filter(Object o, Context<Object> context) {
                return o.toString().contains("b");
            }
        });

        CEP.pattern(dataStream, strictPattern).select(map -> {
            System.out.println("strictPattern:" + map.get("start").toString());
            System.out.println("strictPattern:" + map.get("middle").toString());
            return map;
        }).print();
        /*---------------------------------------------*/

        /*---------松散連續----------------------*/
        Pattern relaxedPattern = Pattern.begin("start").where(new IterativeCondition<Object>() {
            @Override
            public boolean filter(Object s, Context<Object> context) {
                return s.toString().equalsIgnoreCase("a");
            }
        }).followedBy("middle").where(new IterativeCondition<Object>() {
            @Override
            public boolean filter(Object o, Context<Object> context) {
                return o.toString().contains("b");
            }
        });

        CEP.pattern(dataStream, relaxedPattern).select(map -> {
            System.out.println("relaxedPattern:" + map.get("start").toString());
            System.out.println("relaxedPattern:" + map.get("middle").toString());
            return map;
        }).print();
        /*---------------------------------------------*/


        /*---------不確定的松散連續----------------------*/
        Pattern nonDeterminPattern = Pattern.begin("start").where(new IterativeCondition<Object>() {
            @Override
            public boolean filter(Object s, Context<Object> context) {
                return s.toString().equalsIgnoreCase("a");
            }
        }).followedByAny("middle").where(new IterativeCondition<Object>() {
            @Override
            public boolean filter(Object o, Context<Object> context) {
                return o.toString().contains("b");
            }
        });

        CEP.pattern(dataStream, nonDeterminPattern).select(map -> {
            System.out.println("nonDeterminPattern:" + map.get("start").toString());
            System.out.println("nonDeterminPattern:" + map.get("middle").toString());
            return map;
        }).print();
        /*---------------------------------------------*/

        env.execute("Flink CEP Test");
    }
}

輸出結果

nonDeterminPattern:[a]
nonDeterminPattern:[b1]
relaxedPattern:[a]
relaxedPattern:[b1]
nonDeterminPattern:[a]
nonDeterminPattern:[b2]
2> {start=[a], middle=[b2]}
1> {start=[a], middle=[b1]}
1> {start=[a], middle=[b1]}

可以看出嚴格的連續模式並沒有輸出結果,因為a和b之間有c,而松散連續輸出的結果為(a,b1),不確定的松散連續(a,b1),(a,b2)

4.組合模式舉例

上面舉例只說明的個體模式較為簡單,現在舉例說明一個稍微復雜的組合模式舉例
a b+c模式:a和b之間是松散連續,b和c之間是嚴格連續

		DataStream<String> dataStream = env.fromElements(("a"), ("b1"), ("d1"), ("b2"),("d2"),("b3"),("c"));

        //a b+c模式:a和b之間是松散連續,b和c之間是嚴格連續
        Pattern pattern = Pattern.begin("start").where(new IterativeCondition<Object>() {
            @Override
            public boolean filter(Object s, Context<Object> context) {
                return s.toString().equalsIgnoreCase("a");
            }
        }).followedBy("middle").where(new IterativeCondition<Object>() {
            @Override
            public boolean filter(Object o, Context<Object> context) {
                return o.toString().contains("b");
            }
        }).oneOrMore().next("last").where(new IterativeCondition<Object>() {
            @Override
            public boolean filter(Object o, Context<Object> context) {
                return o.toString().contains("c");
            }
        });

        CEP.pattern(dataStream, pattern).select(map -> {
            System.out.println("pattern:" + map.get("start").toString());
            System.out.println("pattern:" + map.get("middle").toString());
            System.out.println("pattern:" + map.get("last").toString());
            return map;
        }).print();

輸出結果為

pattern:[a]
pattern:[b1, b2, b3]
pattern:[c]
1> {start=[a], middle=[b1, b2, b3], last=[c]}

//a+b c模式:a和b之間是嚴格連續,b和c之間是松散連續

		DataStream<String> dataStream = env.fromElements(("a"), ("b1"), ("d1"), ("b2"),("d2"),("b3"),("c"));
		//a+b c模式:a和b之間是嚴格連續,b和c之間是松散連續
        Pattern pattern = Pattern.begin("start").where(new IterativeCondition<Object>() {
            @Override
            public boolean filter(Object s, Context<Object> context) {
                return s.toString().equalsIgnoreCase("a");
            }
        }).next("middle").where(new IterativeCondition<Object>() {
            @Override
            public boolean filter(Object o, Context<Object> context) {
                return o.toString().contains("b");
            }
        }).oneOrMore().followedBy("last").where(new IterativeCondition<Object>() {
            @Override
            public boolean filter(Object o, Context<Object> context) {
                return o.toString().contains("c");
            }
        });

        CEP.pattern(dataStream, pattern).select(map -> {
            System.out.println("--------------------------------------");
            System.out.println("pattern:" + map.get("start").toString());
            System.out.println("pattern:" + map.get("middle").toString());
            System.out.println("pattern:" + map.get("last").toString());
            return map;
        }).print();

輸出結果為:

--------------------------------------
pattern:[a]
pattern:[b1, b2, b3]
pattern:[c]
--------------------------------------
pattern:[a]
pattern:[b1, b2]
pattern:[c]
--------------------------------------
pattern:[a]
pattern:[b1]
pattern:[c]
3> {start=[a], middle=[b1], last=[c]}
1> {start=[a], middle=[b1, b2, b3], last=[c]}
2> {start=[a], middle=[b1, b2], last=[c]}


flink CEP在實時流數據處理應用中並不僅僅上面介紹的這么簡單,還有更多復雜的應用,具體可參照flink官方(https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/libs/cep.html)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM