Flink基礎(十五):DS簡介(15) Flink CEP簡介(一)


什么是復雜事件CEP?

一個或多個由簡單事件構成的事件流通過一定的規則匹配,然后輸出用戶想得到的數據,滿足規則的復雜事件。

特征:

  • 目標:從有序的簡單事件流中發現一些高階特征
  • 輸入:一個或多個由簡單事件構成的事件流
  • 處理:識別簡單事件之間的內在聯系,多個符合一定規則的簡單事件構成復雜事件
  • 輸出:滿足規則的復雜事件

CEP用於分析低延遲、頻繁產生的不同來源的事件流。CEP可以幫助在復雜的、不相關的事件流中找出有意義的模式和復雜的關系,以接近實時或准實時的獲得通知並阻止一些行為。

CEP支持在流上進行模式匹配,根據模式的條件不同,分為連續的條件或不連續的條件;模式的條件允許有時間的限制,當在條件范圍內沒有達到滿足的條件時,會導致模式匹配超時。

看起來很簡單,但是它有很多不同的功能:

  • 輸入的流數據,盡快產生結果
  • 在2個event流上,基於時間進行聚合類的計算
  • 提供實時/准實時的警告和通知
  • 在多樣的數據源中產生關聯並分析模式
  • 高吞吐、低延遲的處理

市場上有多種CEP的解決方案,例如Spark、Samza、Beam等,但他們都沒有提供專門的library支持。但是Flink提供了專門的CEP library。

Flink CEP

Flink為CEP提供了專門的Flink CEP library,它包含如下組件:

  • Event Stream
  • pattern定義
  • pattern檢測
  • 生成Alert

首先,開發人員要在DataStream流上定義出模式條件,之后Flink CEP引擎進行模式檢測,必要時生成告警。

為了使用Flink CEP,我們需要導入依賴:

scala version

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

java version

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

Event Streams

登錄事件流

case class LoginEvent(userId: String,
                      ip: String,
                      eventType: String,
                      eventTime: String)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val loginEventStream = env
  .fromCollection(List(
    LoginEvent("1", "192.168.0.1", "fail", "1558430842"),
    LoginEvent("1", "192.168.0.2", "fail", "1558430843"),
    LoginEvent("1", "192.168.0.3", "fail", "1558430844"),
    LoginEvent("2", "192.168.10.10", "success", "1558430845")
  ))
  .assignAscendingTimestamps(_.eventTime.toLong * 1000)

Pattern API

每個Pattern都應該包含幾個步驟,或者叫做state。從一個state到另一個state,通常我們需要定義一些條件,例如下列的代碼

val loginFailPattern = Pattern.begin[LoginEvent]("begin")
  .where(_.eventType.equals("fail"))
  .next("next")
  .where(_.eventType.equals("fail"))
  .within(Time.seconds(10)

每個state都應該有一個標示:

例如: .begin[LoginEvent]("begin")中的"begin"

每個state都需要有一個唯一的名字,而且需要一個filter來過濾條件,這個過濾條件定義事件需要符合的條件

例如: .where(_.eventType.equals("fail"))

我們也可以通過subtype來限制event的子類型:

 
start.subtype(SubEvent.class).where(...);

事實上,你可以多次調用subtype和where方法;而且如果where條件是不相關的,你可以通過or來指定一個單獨的filter函數:

 
pattern.where(...).or(...);

之后,我們可以在此條件基礎上,通過next或者followedBy方法切換到下一個state,next的意思是說上一步符合條件的元素之后緊挨着的元素;而followedBy並不要求一定是挨着的元素。這兩者分別稱為嚴格近鄰和非嚴格近鄰。

 
val strictNext = start.next("middle") val nonStrictNext = start.followedBy("middle") 

最后,我們可以將所有的Pattern的條件限定在一定的時間范圍內:

 
next.within(Time.seconds(10)) 

這個時間可以是Processing Time,也可以是Event Time。

Pattern 檢測

通過一個input DataStream以及剛剛我們定義的Pattern,我們可以創建一個PatternStream:

val input = ...
val pattern = ...

val patternStream = CEP.pattern(input, pattern)
val patternStream = CEP
  .pattern(
    loginEventStream.keyBy(_.userId), loginFailPattern
  )

一旦獲得PatternStream,我們就可以通過select或flatSelect,從一個Map序列找到我們需要的告警信息。

select

select方法需要實現一個PatternSelectFunction,通過select方法來輸出需要的警告。它接受一個Map對,包含string/event,其中key為state的名字,event則為真是的Event。

val loginFailDataStream = patternStream
  .select((pattern: Map[String, Iterable[LoginEvent]]) => {
    val first = pattern.getOrElse("begin", null).iterator.next()
    val second = pattern.getOrElse("next", null).iterator.next()

    (second.userId, second.ip, second.eventType)
  })

其返回值僅為1條記錄。

flatSelect

通過實現PatternFlatSelectFunction,實現與select相似的功能。唯一的區別就是flatSelect方法可以返回多條記錄。

超時事件的處理

通過within方法,我們的parttern規則限定在一定的窗口范圍內。當有超過窗口時間后還到達的event,我們可以通過在select或flatSelect中,實現PatternTimeoutFunction/PatternFlatTimeoutFunction來處理這種情況。

val complexResult = patternStream.select(orderTimeoutOutput) {
  (pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => {
    val createOrder = pattern.get("begin")
    OrderTimeoutEvent(createOrder.get.iterator.next().orderId, "timeout")
  }
} {
  pattern: Map[String, Iterable[OrderEvent]] => {
    val payOrder = pattern.get("next")
    OrderTimeoutEvent(payOrder.get.iterator.next().orderId, "success")
  }
}

val timeoutResult = complexResult.getSideOutput(orderTimeoutOutput)

complexResult.print()
timeoutResult.print()

完整例子:

scala version

object CepExample {

  case class LoginEvent(userId: String, ip: String, eventType: String, eventTime: Long)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream = env
      .fromElements(
        LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
        LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
        LoginEvent("user_1", "192.168.0.3", "fail", 4000L),
        LoginEvent("user_2", "192.168.10.10", "success", 5000L)
      )
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(r => r.userId)

    val pattern = Pattern
      .begin[LoginEvent]("first")
      .where(r => r.eventType.equals("fail"))
      .next("second")
      .where(r => r.eventType.equals("fail"))
      .next("third")
      .where(r => r.eventType.equals("fail"))
      .within(Time.seconds(5))

    val patternedStream = CEP.pattern(stream, pattern)

    patternedStream
      .select((pattern: scala.collection.Map[String, Iterable[LoginEvent]]) => {
        val first = pattern("first").iterator.next()
        val second = pattern("second").iterator.next()
        val third = pattern("third").iterator.next()

        (first.userId, first.ip, second.ip, third.ip)
      })
      .print()

    env.execute()
  }
}

java version

POJO類定義

public class LoginEvent {
    public String userId;
    public String ipAddress;
    public String eventType;
    public Long eventTime;

    public LoginEvent(String userId, String ipAddress, String eventType, Long eventTime) {
        this.userId = userId;
        this.ipAddress = ipAddress;
        this.eventType = eventType;
        this.eventTime = eventTime;
    }

    public LoginEvent() {}

    @Override
    public String toString() {
        return "LoginEvent{" +
                "userId='" + userId + '\'' +
                ", ipAddress='" + ipAddress + '\'' +
                ", eventType='" + eventType + '\'' +
                ", eventTime=" + eventTime +
                '}';
    }
}

業務邏輯編寫

public class CepExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<LoginEvent> stream = env
                .fromElements(
                        new LoginEvent("user_1", "0.0.0.0", "fail", 2000L),
                        new LoginEvent("user_1", "0.0.0.1", "fail", 3000L),
                        new LoginEvent("user_1", "0.0.0.2", "fail", 4000L)
               )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<LoginEvent>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() {
                            @Override
                            public long extractTimestamp(LoginEvent loginEvent, long l) {
                                return loginEvent.eventTime;
                            }
                        })
                )
                .keyBy(r -> r.userId);

        Pattern<LoginEvent, LoginEvent> pattern = Pattern
                .<LoginEvent>begin("first")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return loginEvent.eventType.equals("fail");
                    }
                })
                .next("second")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return loginEvent.eventType.equals("fail");
                    }
                })
                .next("third")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return loginEvent.eventType.equals("fail");
                    }
                })
                .within(Time.seconds(5));


        PatternStream<LoginEvent> patternedStream = CEP.pattern(stream, pattern);

        patternedStream
                .select(new PatternSelectFunction<LoginEvent, Tuple4<String, String, String, String>>() {
                    @Override
                    public Tuple4<String, String, String, String> select(Map<String, List<LoginEvent>> map) throws Exception {
                        LoginEvent first = map.get("first").iterator().next();
                        LoginEvent second = map.get("second").iterator().next();
                        LoginEvent third = map.get("third").iterator().next();
                        return Tuple4.of(first.userId, first.ipAddress, second.ipAddress, third.ipAddress);
                    }
                })
                .print();

        env.execute();
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM