1.flink CEP描述
CEP(Complex Event Processing)就是在無界事件流中檢測事件模式,使能夠掌握數據中重要的部分。
2.flink CEP編程的四個步驟
1>.輸入數據流的創建
2>.模式(Pattern)定義
3>.Pattern應用在事件流上的檢測
4>.選取結果
3.常用的個體連續連續模式:
嚴格連續模式,松散連續,不確定的松散連續。當然還有嚴格連續的NOT模式和松散連續的NOT模式,這兩種並不常用,下面代碼舉例說明常用的三種模式
flink CEP編程需要導入的lib包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
package org.stsffap.cep.monitoring;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MyCEPTest {
public static void main(String args[]) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromElements(("a"), ("c"), ("b1"), ("b2"));
/*---------嚴格連續模式----------------------*/
Pattern strictPattern = Pattern.begin("start").where(new IterativeCondition<Object>() {
@Override
public boolean filter(Object s, Context<Object> context) {
return s.toString().equalsIgnoreCase("a");
}
}).next("middle").where(new IterativeCondition<Object>() {
@Override
public boolean filter(Object o, Context<Object> context) {
return o.toString().contains("b");
}
});
CEP.pattern(dataStream, strictPattern).select(map -> {
System.out.println("strictPattern:" + map.get("start").toString());
System.out.println("strictPattern:" + map.get("middle").toString());
return map;
}).print();
/*---------------------------------------------*/
/*---------松散連續----------------------*/
Pattern relaxedPattern = Pattern.begin("start").where(new IterativeCondition<Object>() {
@Override
public boolean filter(Object s, Context<Object> context) {
return s.toString().equalsIgnoreCase("a");
}
}).followedBy("middle").where(new IterativeCondition<Object>() {
@Override
public boolean filter(Object o, Context<Object> context) {
return o.toString().contains("b");
}
});
CEP.pattern(dataStream, relaxedPattern).select(map -> {
System.out.println("relaxedPattern:" + map.get("start").toString());
System.out.println("relaxedPattern:" + map.get("middle").toString());
return map;
}).print();
/*---------------------------------------------*/
/*---------不確定的松散連續----------------------*/
Pattern nonDeterminPattern = Pattern.begin("start").where(new IterativeCondition<Object>() {
@Override
public boolean filter(Object s, Context<Object> context) {
return s.toString().equalsIgnoreCase("a");
}
}).followedByAny("middle").where(new IterativeCondition<Object>() {
@Override
public boolean filter(Object o, Context<Object> context) {
return o.toString().contains("b");
}
});
CEP.pattern(dataStream, nonDeterminPattern).select(map -> {
System.out.println("nonDeterminPattern:" + map.get("start").toString());
System.out.println("nonDeterminPattern:" + map.get("middle").toString());
return map;
}).print();
/*---------------------------------------------*/
env.execute("Flink CEP Test");
}
}
輸出結果
nonDeterminPattern:[a]
nonDeterminPattern:[b1]
relaxedPattern:[a]
relaxedPattern:[b1]
nonDeterminPattern:[a]
nonDeterminPattern:[b2]
2> {start=[a], middle=[b2]}
1> {start=[a], middle=[b1]}
1> {start=[a], middle=[b1]}
可以看出嚴格的連續模式並沒有輸出結果,因為a和b之間有c,而松散連續輸出的結果為(a,b1),不確定的松散連續(a,b1),(a,b2)
4.組合模式舉例
上面舉例只說明的個體模式較為簡單,現在舉例說明一個稍微復雜的組合模式舉例
a b+c模式:a和b之間是松散連續,b和c之間是嚴格連續
DataStream<String> dataStream = env.fromElements(("a"), ("b1"), ("d1"), ("b2"),("d2"),("b3"),("c"));
//a b+c模式:a和b之間是松散連續,b和c之間是嚴格連續
Pattern pattern = Pattern.begin("start").where(new IterativeCondition<Object>() {
@Override
public boolean filter(Object s, Context<Object> context) {
return s.toString().equalsIgnoreCase("a");
}
}).followedBy("middle").where(new IterativeCondition<Object>() {
@Override
public boolean filter(Object o, Context<Object> context) {
return o.toString().contains("b");
}
}).oneOrMore().next("last").where(new IterativeCondition<Object>() {
@Override
public boolean filter(Object o, Context<Object> context) {
return o.toString().contains("c");
}
});
CEP.pattern(dataStream, pattern).select(map -> {
System.out.println("pattern:" + map.get("start").toString());
System.out.println("pattern:" + map.get("middle").toString());
System.out.println("pattern:" + map.get("last").toString());
return map;
}).print();
輸出結果為
pattern:[a]
pattern:[b1, b2, b3]
pattern:[c]
1> {start=[a], middle=[b1, b2, b3], last=[c]}
//a+b c模式:a和b之間是嚴格連續,b和c之間是松散連續
DataStream<String> dataStream = env.fromElements(("a"), ("b1"), ("d1"), ("b2"),("d2"),("b3"),("c"));
//a+b c模式:a和b之間是嚴格連續,b和c之間是松散連續
Pattern pattern = Pattern.begin("start").where(new IterativeCondition<Object>() {
@Override
public boolean filter(Object s, Context<Object> context) {
return s.toString().equalsIgnoreCase("a");
}
}).next("middle").where(new IterativeCondition<Object>() {
@Override
public boolean filter(Object o, Context<Object> context) {
return o.toString().contains("b");
}
}).oneOrMore().followedBy("last").where(new IterativeCondition<Object>() {
@Override
public boolean filter(Object o, Context<Object> context) {
return o.toString().contains("c");
}
});
CEP.pattern(dataStream, pattern).select(map -> {
System.out.println("--------------------------------------");
System.out.println("pattern:" + map.get("start").toString());
System.out.println("pattern:" + map.get("middle").toString());
System.out.println("pattern:" + map.get("last").toString());
return map;
}).print();
輸出結果為:
--------------------------------------
pattern:[a]
pattern:[b1, b2, b3]
pattern:[c]
--------------------------------------
pattern:[a]
pattern:[b1, b2]
pattern:[c]
--------------------------------------
pattern:[a]
pattern:[b1]
pattern:[c]
3> {start=[a], middle=[b1], last=[c]}
1> {start=[a], middle=[b1, b2, b3], last=[c]}
2> {start=[a], middle=[b1, b2], last=[c]}
5.flink CEP應用場景及總結
flink CEP在實時流數據處理應用中並不僅僅上面介紹的這么簡單,還有更多復雜的應用,具體可參照flink官方(https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/libs/cep.html)。