Flink之復雜事物處理(CEP)


CEP指的是復雜事物處理,FlinkCEP是復雜事物處理庫在Flink上的實現。它使你可以檢測無窮無盡的事件流中的事件模式,從而有機會掌握數據中的重要信息。

入門

首先要導入FlinkCEP的依賴。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep-scala_2.12</artifactId>
    <version>1.11.2</version>
</dependency>

導入依賴后才能使用FlinkCEP中提供的API。以下是FlinkCEP程序的一般模版。

val input: DataStream[Event] = ...

val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(input, pattern)

val result: DataStream[Alert] = patternStream.process(
    new PatternProcessFunction[Event, Alert]() {
        override def processMatch(
              `match`: util.Map[String, util.List[Event]],
              ctx: PatternProcessFunction.Context,
              out: Collector[Alert]): Unit = {
                out.collect(createAlertFrom(pattern))
        }
    })

Pattern API

Pattern API可以定義要從輸入流中提取的復雜模式序列。

每個復雜模式序列都包含多個簡單模式,即尋找具有相同屬性的單個事件的模式。我們將稱這些簡單模式為模式,以及稱我們在流中搜索的最終復雜模式為模式序列。

個體模式

一個模式(即前面說的簡單模式)可以是單例模式,也可以是循環模式。單例模式接收一個事件,而循環模式可以接收多個模式。

例如在模式匹配的符號中,a b+ c? d代表a后面跟了一個或多個的b,可能有個c,然后有個d。其中a,c?,d就是單例模式,而b+是循環模式。通過添加量詞,可以將單例模式轉換為循環模式。每個模式都可以基於若干個條件來接收事件。

量詞

利用量詞,可以指定循環的次數。還有greedy()方法讓模式匹配盡可能多次,optional()方法讓模式要么匹配若干次,要么不匹配,以模式start為例。

// 匹配4次
start.times(4)

// 要么匹配0次,要么匹配4次
start.times(4).optional()

// 匹配2到4次
start.times(2, 4)

// 匹配2到4次且希望盡可能多地匹配
// expecting 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).greedy()

// 匹配要么0次要么2到4次
start.times(2, 4).optional()

// 匹配0次、2到4次且盡可能地多
start.times(2, 4).optional().greedy()

// 匹配1次或多次
start.oneOrMore()

// 匹配1次或者盡可能多次
start.oneOrMore().greedy()

// expecting 0 or more occurrences
start.oneOrMore().optional()

// expecting 0 or more occurrences and repeating as many as possible
start.oneOrMore().optional().greedy()

// expecting 2 or more occurrences
start.timesOrMore(2)

// expecting 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).greedy()

// expecting 0, 2 or more occurrences
start.timesOrMore(2).optional()

// expecting 0, 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).optional().greedy()

條件

對於每一個模式,都可以設置一些條件來控制該模式是否要開始接收事件。

可以通過pattern.where()pattern.or()pattern.until()來給事件的屬性指定條件。

迭代條件

迭代條件是最普遍的條件類型。可以根據先前接受的事件的屬性或這些事件的子集的統計信息來指定接受后續事件的條件。

以下是一段迭代條件的示例代碼。如果當前事件的名稱以“ foo”開頭,並且該模式先前接受的事件的價格加上當前事件的價格的值不超過5.0,則該迭代條件接受名為模式middle的下一個事件。

middle.oneOrMore()
    .subtype(classOf[SubEvent])
    .where(
        (value, ctx) => {
            lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
            value.getName.startsWith("foo") && sum + value.getPrice < 5.0
        }
    )

簡單條件

這種類型的條件擴展了IterativeCondition類,並僅基於事件本身的屬性來決定是否接受事件。例如:

start.where(event => event.getName.startsWith("foo"))

最后,您還可以通過pattern.subtype(subClass)方法將接受事件的類型限制為初始事件類型的子類型(此處為Event)。

start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)

復合條件

復合條件即各種條件的組合。多個順序排列的where()方法,代表邏輯與。邏輯或可以用or()方法。

pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)

停止條件

對於循環模式(例如使用了oneOrMore()的),可以通過某些停止條件讓其停止接收事件。

為了更深入理解舉個例子。給定模式a+ until b,給定以下事件序列a1, c, a2, b, a3,則{a1, a2} {a1} {a2} {a3}將被輸出。

下表給出以上條件操作的總結:

模式操作 描述
where(condition) 要匹配當前的模式,就必須要滿足condition定義的條件。多個連續的where()代表條件的邏輯與。
or(condition) 添加一個與現有條件邏輯或的條件。也就是說事件至少要滿足其中一個條件才能匹配。
until(condition) 指定循環模式的停止條件。一旦事件滿足這一條件,模式將不再接收事件。一般oneOrMore()連在一起使用作為停止條件。
subtype(subClass) 為當前模式定義子類型條件。只有屬於此子類型的事件才能被當前模式匹配。
oneOrMore() 此方法定義的條件就是字面意思,就是希望事件出現至少一次。默認情況下是寬松近鄰條件。
timesOrMore(#times) 希望事件出現至少#times次。
times(#ofTimes) 希望事件出現#ofTimes次。
times(#fromTimes, #toTimes) 希望事件出現#fromTimestoTimes
optional() 定義該模式是可選的,即它可能發生也可能不發生。
greedy() 定義該模式是貪婪的,即將改模式重復盡可能多次。當前僅支持在量詞中使用。

復合模式

將個體模式組合起來就是復合模式。

首先,一個模式序列必須以一個初始模式開始:

val start : Pattern[Event, _] = Pattern.begin("start")

接着,用鄰近條件將不同的模式連接起來。FlinkCEP支持以下的連續性條件:

  1. 嚴格鄰近(Strict Contiguity):期望所有匹配事件嚴格地一個接一個地出現,而中間沒有任何不匹配事件,通過next()方法實現。
  2. 寬松鄰近(Relaxed Contiguity):允許匹配事件之間出現不匹配事件,通過followedBy()實現。
  3. 非確定寬松鄰近(Non-Deterministic Relaxed Contiguity):進一步放松了連續性,允許其他匹配忽略某些匹配事件,通過followedByAny()實現。
  4. 另外,如果不希望事件之后有某些事件,可以通過notNext()notFollowedBy()實現。
// strict contiguity
val strict: Pattern[Event, _] = start.next("middle").where(...)

// relaxed contiguity
val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)

// non-deterministic relaxed contiguity
val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)

// NOT pattern with strict contiguity
val strictNot: Pattern[Event, _] = start.notNext("not").where(...)

// NOT pattern with relaxed contiguity
val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)

寬松鄰近意味着只有第一個成功匹配的事件會被匹配,而非確定鄰近則會在同一個初始模式情況下返回多個匹配。例如,對於一個模式a b,給定以下事件序列a, c, b1, b2,不同的鄰近條件將返回不同的結果:

  1. 嚴格鄰近將返回空集;
  2. 寬松鄰近則會返回{a b1},因為寬松鄰近會跳過不匹配的事件直至下一次匹配;
  3. 非確定寬松鄰近則會返回{a b1} {a b2}

還可以給模式一個時間約束,使其在規定時間內有效。

next.within(Time.seconds(10))

循環模式中的近鄰

在循環模式中也可以規定近鄰條件。這種鄰近條件將被應用於被模式接收的事件之間。例如,對於一個模式a b+ c,給定以下事件序列a, b1, d1, b2, d2, b3, c,不同的鄰近條件將返回不同的結果:

  1. 嚴格鄰近將返回{a b3 c}
  2. 寬松鄰近則會返回{a b1 c}, {a b1 b2 c}, {a b1 b2 b3 c}, {a b2 c}, {a b2 b3 c}, {a b3 c},事件a和事件b之間的事件d被忽略了;
  3. 非確定寬松鄰近則會返回{a b1 c}, {a b1 b2 c}, {a b1 b3 c}, {a b1 b2 b3 c}, {a b2 c}, {a b2 b3 c}, {a b3 c}

對於循環模式來說,默認是寬松鄰近。如果需要其余兩種連續性則需要分別調用consecutive()allowCombinations()方法來指定。對於模式:

Pattern.begin("start").where(_.getName().equals("c"))
  .followedBy("middle").where(_.getName().equals("a"))
                       .oneOrMore().consecutive()
  .followedBy("end1").where(_.getName().equals("b"))

事件流C D A1 A2 A3 D A4 B將會返回{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B},而在沒有指定嚴格鄰近條件時,返回的是{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

對於模式:

Pattern.begin("start").where(_.getName().equals("c"))
  .followedBy("middle").where(_.getName().equals("a"))
                       .oneOrMore().allowCombinations()
  .followedBy("end1").where(_.getName().equals("b"))

事件流C D A1 A2 A3 D A4 B將會返回{C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B},而在沒有指定嚴格鄰近條件時,返回的是{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

模式操作 描述
consecutive() oneOrMore()times()一起使用,用於指定匹配事件之間的嚴格鄰近條件。
allowCombinations() oneOrMore()times()一起使用,用於指定匹配事件之間的非確定寬松鄰近條件。

模式組

模式序列也可以通過begin, followedBy, followedByAny, next等條件形成一個模式組GroupPattern,此時該模式序列邏輯上被視為匹配的條件。在GroupPattern上可以指定循環條件,也可以指定鄰近條件。

val start: Pattern[Event, _] = Pattern.begin(
    Pattern.begin[Event]("start").where(...).followedBy("start_middle").where(...)
)

// strict contiguity
val strict: Pattern[Event, _] = start.next(
    Pattern.begin[Event]("next_start").where(...).followedBy("next_middle").where(...)
).times(3)

// relaxed contiguity
val relaxed: Pattern[Event, _] = start.followedBy(
    Pattern.begin[Event]("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore()

// non-deterministic relaxed contiguity
val nonDetermin: Pattern[Event, _] = start.followedByAny(
    Pattern.begin[Event]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional()
模式操作 描述
begin(#name) 定義一個起始模式。
begin(#pattern_sequence) 定義一個起始模式。即可以從個體模式定義起始模式,也可以從模式序列定義起始模式。
next(#name) 附加一個新模式,其滿足嚴格鄰近條件。
next(#pattern_sequence) 同上。
followedBy(#name) 附加一個新模式,其滿足寬松鄰近條件。
followedBy(#pattern_sequence) 同上。
followedByAny(#name) 附加一個新模式,其滿足非確定寬松鄰近。
followedByAny(#pattern_sequence) 同上。
notNext() 不希望前一事件之后有某一事件,滿足嚴格近鄰條件。
notFollowedBy() 不希望前一事件之后有某一事件,滿足寬松近鄰條件。
within(time) 定義一個與模式匹配的事件序列的最大時間間隔。如果一個未完成的事件序列超過了這個時間,它將被丟棄。

匹配后跳過策略

對於給定的模式,同一個事件可能被分配給多次不同的匹配。要控制一個事件將分配多少個匹配,就需要指定名為AfterMatchSkipStrategy的跳過策略。FlinkCEP一共支持5種跳過策略。

Function 描述
AfterMatchSkipStrategy.noSkip() 創建一個NO_SKIP策略,即任意一次匹配都不會被跳過。
AfterMatchSkipStrategy.skipToNext() 創建一個SKIP_TO_NEXT策略,即丟棄以同一事件開始的所有部分匹配。
AfterMatchSkipStrategy.skipPastLastEvent() 創建一個SKIP_PAST_LAST_EVENT策略,即丟棄匹配開始后但結束之前開始的所有部分匹配。使用該策略只會有一個結果被輸出。
AfterMatchSkipStrategy.skipToFirst(patternName) 創建一個SKIP_TO_FIRST策略,即丟棄在匹配開始后但在指定事件第一次發生前開始的所有部分匹配。需要指定一個有效的patternName
AfterMatchSkipStrategy.skipToLast(patternName) 創建一個SKIP_TO_LAST策略,即丟棄在匹配開始后但在指定事件最后一次發生前開始的所有部分匹配。需要指定一個有效的patternName

可以在創建模式時,在begin方法中指定一個AfterMatchSkipStrategy,就可以將該AfterMatchSkipStrategy應用到當前的模式中。如果沒有指定,Flink會默認將AfterMatchSkipStrategy指定為NO_SKIP。

val skipStrategy = ...
Pattern.begin("patternName", skipStrategy)

模式的檢測

在指定要查找的模式序列之后,就可以將其應用到輸入流中,以檢測潛在的匹配。要根據模式序列運行事件流,必須創建一個PatternStream。給定一個輸入流input、一個模式pattern和一個可選的比較器comparator。輸入流可以是dataStream也可以是keyedStream。比較器用於在EventTime事件中對具有相同時間戳的事件進行排序,或者在同一時刻到達的事件。可以通過以下代碼來創建PatternStream

val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // 可選

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)

匹配事件提取

創建PatternStream后,可以用select()flatselect()方法,從檢測到的事件序列中提取事件了。

select()方法需要輸入一個select fuction為參數,每個成功匹配的事件都會調用它。select()方法以一個Map[String, Iterable[IN]]來接收匹配到的事件序列,其中key是每個模式的名稱,而value是所有接收到的事件的Iterable類型。需要注意的是,select fuction每次調用只會返回一個結果。

def selectFn(pattern: Map[String, Iterable[IN]]): OUT = {
  val startEvent = pattern.get("start").get.next
  val endEvent = pattern.get("end").get.next
  OUT(startEvent, endEvent)
}

也可以通過flat select fuction來提取匹配事件。flat select fuction與select fuction類似,不過flat select fuction使用Collector作為返回結果的容器,因此每次調用可以返回任意數量的結果。

def flatSelectFn(pattern: Map[String, Iterable[IN]]): collector: COLLECTOR[OUT] = {
  val startEvent = pattern.get("start").get.next
  val endEvent = pattern.get("end").get.next
  for (i <- 0 to startEvent.getValue){
    collector.collect(OUT(startEvent, endEvent))
  }
}

超時事件提取

對於模式中的事件,如果沒有及時處理或者超過了within規定的時間,就會成為超時事件。為了對超時事件進行處理,Pattern API也提供了select和flatSelect兩個方法來對超時事件進行處理。

超時處理程序會接收到目前為止由模式匹配到的所有事件,由一個OutputTag定義接收到的超時事件序列。同樣地,超時事件處理中也有select()方法和flatselect()方法。

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
// 創建一個OutputTag並命名為late-data
val lateDataOutputTag = OutputTag[String]("late-data")

val result = patternStream.select(lateDataOutputTag){
  // 提取超時事件
  (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeOutEvent()
}{
  pattern: Map[String, Iterable[Event]] => ComplexEvent()
}
// 調用getSideOutput將超時事件輸出
val lateData = result.getSideOutput(lateDataOutputTag)

下面是使用flatselect()方法的代碼示例。

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
// 創建一個OutputTag並命名為side-output
val outputTag = OutputTag[String]("side-output")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){
    // 提取超時事件
    (pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
        out.collect(TimeoutEvent())
} {
    (pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>
        out.collect(ComplexEvent())
}
// 調用getSideOutput將超時事件輸出
val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(outputTag)

代碼示例

以下是一段示例代碼。

import java.util

import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
// 定義輸入事件的樣例類
case class UserAction(userName: String, eventType: String, eventTime: Long)
// 定義輸出事件的樣例類
case class ClickAndBuyAction(userName: String, clickTime: Long, buyTime: Long)

object UserActionDetect {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val dataList = List(
      UserAction("Adam", "click", 1558430815185L),
      UserAction("Adam", "buy", 1558430815865L),
      UserAction("Adam", "order", 1558430815985L),
      UserAction("Berry", "buy", 1558430815988L),
      UserAction("Adam", "click", 1558430816068L),
      UserAction("Berry", "order", 1558430816074L),
      UserAction("Carl", "click", 1558430816151L),
      UserAction("Carl", "buy", 1558430816641L),
      UserAction("Dennis", "buy", 1558430817128L),
      UserAction("Carl", "click", 1558430817165L),
      UserAction("Ella", "click", 1558430818652L),
    )
    // 1. 創建輸入事件流
    val userLogStream = env.fromCollection(dataList)
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(_.userName)
    // 2. 用戶自定義模式
    val userActionPattern = Pattern.begin[UserAction]("begin")
      .where(_.eventType == "click")
      .next("next")
      .where(_.eventType == "buy")
    // 3. 調用CEP.pattern方法尋找與模式匹配的事件
    val patternStream = CEP.pattern(userLogStream, userActionPattern)
    // 4. 輸出結果
    val result = patternStream.select(new ClickAndBuyMatch())

    result.print()

    env.execute()
  }
}
// 重寫select方法
class ClickAndBuyMatch() extends PatternSelectFunction[UserAction, ClickAndBuyAction] {
  override def select(map: util.Map[String, util.List[UserAction]]): ClickAndBuyAction = {
    val click: UserAction = map.get("begin").iterator().next()
    val buy: UserAction = map.get("next").iterator().next()
    ClickAndBuyAction(click.userName, click.eventTime, buy.eventTime)
  }
}

pom.xml文件如下:

<project>
    <groupId>cn.edu.xmu.dblab</groupId>
    <artifactId>simple-project</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>Simple Project</name>
    <packaging>jar</packaging>
    <version>1.0</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM