1.什么是 CEP
概念:
一個或多個由簡單事件構成的事件流通過一定的規則匹配,然后輸出用戶想得到的數據,滿足規則的復雜事件。
特征:
目標:從有序的簡單事件流中發現一些高階特征
輸入:一個或多個由簡單事件構成的事件流
處理:識別簡單事件之間的內在聯系,多個符合一定規則的簡單事件構成復雜事件
輸出:滿足規則的復雜事件

CEP 用於分析低延遲、頻繁產生的不同來源的事件流。CEP 可以幫助在復雜的、不相關的事件流中找出有意義的模式和復雜的關系,以接近實時或准實時的獲得通知並阻止一些行為。
CEP 支持在流上進行模式匹配,根據模式的條件不同,分為連續的條件或不連續的條件;模式的條件允許有時間的限制,當在條件范圍內沒有達到滿足的條件時,
會導致模式匹配超時。
看起來很簡單,但是它有很多不同的功能:
輸入的流數據,盡快產生結果
在 2 個 event 流上,基於時間進行聚合類的計算
提供實時/准實時的警告和通知
在多樣的數據源中產生關聯並分析模式
高吞吐、低延遲的處理
市場上有多種 CEP 的解決方案,例如 Spark、Samza、Beam 等,但他們都沒有提供專門的 library 支持。但是 Flink 提供了專門的 CEP library。
2 Flink CEP
Flink 為 CEP 提供了專門的 Flink CEP library,它包含如下組件:
Event Stream
pattern 定義
pattern 檢測
生成 Alert

首先,開發人員要在 DataStream 流上定義出模式條件,之后 Flink CEP 引擎進行模式檢測,必要時生成告警。
為了使用 Flink CEP,我們需要導入依賴:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
Event Streams
以登陸事件流為例:
case class LoginEvent(userId: String, ip: String, eventType: String, eventTime: String)
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1)
val loginEventStream = env.fromCollection(List( LoginEvent("1", "192.168.0.1", "fail", "1558430842"), LoginEvent("1", "192.168.0.2", "fail", "1558430843"), LoginEvent("1", "192.168.0.3", "fail", "1558430844"), LoginEvent("2", "192.168.10.10", "success", "1558430845") )).assignAscendingTimestamps(_.eventTime.toLong)
3 Pattern API
4 個體模式
個體模式的條件
5 模式序列
6 模式檢測
7 匹配事件提取
8 超時事件的提取