Flink應用程序結構開發介紹


Flink程序遵循一定的編程模式。DataStream API 和 DataSet API 基本具有相同的程序結構。以下為一個流式程序的示例代碼來對文本文件進行詞頻統計。

package com.realtime.flink.streaming
import org.apache.flink.apijava.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}

object WordCount {
    def main(args: Array[String]) {
        //第一步:設定執行環境
        val env = SreamExecutionEnvironment.getExecutionEnvironment
       //第二步:指定數據源地址,開始讀取數據
        val text = env.readTextFile("file:///path/file") 
      //第三步:對數據集指定轉換操作邏輯
      val counts : DataStream[(String, int)]  = text
          .flatMap(_.toLowerCase.split(" ")) 
          .fliter(_.nonEmpty)
          .map(_, 1)
          .sum(1)
      //第四步:指定計算結果輸出位置
      if (params.has("output")) {
          counts.writeAsText(params.get("output"))
      } else {
        println("Printing resule to stdout. Use --output to specify output path.")
        counts.print()
      }
      //第五步:指定名稱並觸發流式任務
      env.execute("Streaming WordCount")
    }
}

整個Flink 程序一共分為5步:

1. Flink執行環境

不同的執行環境決定了應用的類型:

StreamExecutionEnvironmen用來流式處理,ExecutionEnvironment是批量數據處理環境.

獲取環境的三種方式:

  • 流處理:

    //設定Flink運行環境,如果在本地啟動則創建本地環境,如果在集群啟動就創建集群環境
    StreamExecutionEnvironment.getExecutionEnvironment
    //指定並行度創建本地執行環境
    StreamExecutionEnvironment.createLocalEnvironment(5)
    //指定遠程JobManager ip和RPC 端口以及運行程序所在的jar包和及其依賴包
    StreamExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "/user/application.jar")
    

    第三種方式直接從本地代碼創建與遠程集群的JobManager的RPC連接,指定jar將運行程序遠程拷貝到JobManager節點上,Flink應用程序運行在遠程的環境中,本地程序相當於一個客戶端.

  • 批處理:

  //設定Flink運行環境,如果在本地啟動則創建本地環境,如果在集群啟動就創建集群環境
 ExecutionEnvironment.getExecutionEnvironment
  //指定並行度創建本地執行環境
  ExecutionEnvironment.createLocalEnvironment(5)
  //指定遠程JobManager ip和RPC 端口以及運行程序所在的jar包和及其依賴包
 ExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "/user/application.jar")

注意不同的語言開發Flink應用的時候需要引入不同環境對應的執行環境

2. 初始化數據

  • 創建完執行環境, ExecutionEnvironment 需要提供不同的數據接入接口完成數據初始化,將外部數據轉換成DataStream 或DataSet 數據集.

  • Flink提供了多種從外部讀取數據的連接器,包括批量和實時的數據連接器,能夠將Flink系統與其他第三方系統進行連接,直接獲取外部數據

  • 以下代碼通過readTextFile()方法讀取flle://pathfile路徑中的數據並轉換成DataStream 數據集.

val text: DataStream[String] = env.readTextFlie("flle://pathfile")

讀取文件轉換為DataStream[String]數據集,完成了從本地文件到分布式數據集的轉換

3. 執行轉換操作

對數據集的各種Transformation操作通過不同的Operator來實現,每個Operator來實現,每個Operator內部通過實現Function接口完成數據處理邏輯的定義.

DataStream API 和 DataSet API 提供了很多轉換算子, 如: map, flatMap, filter, keyBy, 用戶只需要定義每個算子執行的函數邏輯,然后應用在數據轉換操作Operator 接口即可.

 val counts: DataStream[String, Int] = text
     .flatMap(_.toLowerCase.split(" "))  //執行flatMap操作
     .filter(_.nonEmpty) //過濾空字段
     .map((_, 1) //執行map轉換操作,轉換成key - value 接口
     .keyBy(0) // 按照指定key對數據重分區
     .sum(1) /執行求和運算操作

flink 定義Function的計算邏輯可以通過以下幾種方式完成定義:

1. 通過創建Class 實現Function接口

//實現MapFunction接口
 class MyMapFunction extends MapFunction[String, String] {
     override def map(t: String): String {
         t.toUpperCase()
     }
 }
 
 val dataStream: DataStream[String] = env.fromElements("hello", flink)
 //將MyMapFunction實現類傳入進去
 dataStream.map(new MyMapFunction)

完成對實現將數據集中的字符串轉換成大寫的數據處理

2. 通過創建匿名類實現Function接口

 val dataStream: DataStream[String] = env.fromElements("hello", flink)
 //通過創建MapFunction匿名實現類來定義map函數的計算邏輯
 dataStream.map(new MapFunction[String, String] {
     //實現對輸入字符串大寫轉換
      override def map(t: String): String{
         t.toUpperCase()
     }
 })

3. 通過實現RichFunction接口

Flink提供了RichFunction接口,用於比較高級的數據處理場景,RichFunction接口中有open、close、getRuntimeContext 以及setRuntimeContext來獲取狀態、緩存等系統內部數據. 與MapFunction類似,RichFunction子類也有RichMapFunction.

//定義匿名類實現RichMapFunction接口,完成對字符串到整形數字的轉換
 dataStream.map(new RichMapFunction[String, Int] {
     //實現對輸入字符串大寫轉換
      override def map(in: String):Int = (in.toInt)
 })

4.分區key指定

某些算子需要指定的key進行轉換,常見的算子有: join 、coGroup、groupBy.需要將DataStream或DataSet數據集轉換成對應KeyedStream 和GroupDataSet ,主要是將相同key的數據路由到相同的Pipeline中

1.根據字段位置指定

//DataStream API聚合計算

val dataStream : DataStream[(String,Int)] = env.fromElements(("a", 1),("c", 2))

//根據第一個字段重新分區,然后對第二個字段進行求和計算
val result = dataStream.keyBy(0).sum(1)

//DataSet API 聚合計算
val dataSet = env.fromElements(("a", 1),("c", 2))
//根據第一個字段進行數據重分區
val groupDataSet : GroupDataSet[(String , Int)] = dataSet.groupBy(0)
//求取相同key值第二個字段的最大值
groupDataSet.max(1)

2.根據字段名稱指定

使用字段名稱需要DataStream 中的數據結構類型必須是Tuple類或者POJOs類

val personDataSet = env.fromElements(new Person("Alex", 18), new Person("Peter", 43))
//指定name字段名稱來確定groupBy 字段
 personDataSet.groupBy("name").max(1)

如果程序中使用Tuple數據類型,通常情況下字段名稱從1開始計算,字段位置索引從0開始計算

val personDataStream = env.fromElements(new Person("Alex", 18), new Person("Peter", 43))
//通過名稱指定第一個字段
personDataStream.keyBy("_1")

//通過位置指定第一個字段
personDataStream.keyBy(0)

使用嵌套的復雜數據結構:

class NestedClass {
    var id: int,
    tuples: (Long, Long, String)){
        def this() {
            this(0, (0, 0, " "))
        }
    }

class CompelexClass(var nested: NestedClass, var tag: String) {
    def this() {
        this(null, " ")
    }
}

通過“nested”獲取整個NestedClass對象所有字段,調用“tag”獲取 CompelexClass中tag字段,調用“nested.id”獲取NestedClass的id字段,調用“nested.tuples._1”獲取NestedClass中tuple元祖第一個字段

3. 通過Key選擇器指定

定義KeySelector,然后復寫getKey方法,從Person對象中獲取name為指定的Key.

case class Person(name: String, age: Int)
var person = env.fromElements(Person("hello", 1), Person("Flink", 3) )
//
val keyed: KeyedStream[WC] = person.keyBy(new KeySelector[Person, String](){
    override def getKey(person: Person): String = person.name
})

5.輸出結果

數據進行轉換操作之后,一般會輸出到外部系統或者控制台上.Flink 除了基本的數據輸出方法,在系統中還定義了很多Connector,用戶通過調用addSink()添加輸出系統定義的DataSink類算子,這樣就可以將數據輸出到外部系統.

//將數據輸出到文件中
counts.writeAsText("file://path/to/savefile")
//將數據輸出控制台
counts.print()

程序觸發

計算邏輯全部操作定義好后,需要調ExecutionEnvironment的execute()方法來觸發程序的執行,execute()方法返回的結果類型為JobExecutionResult,JobExecutionResult包含了程序執行的時間和累加器等指標.

注意: DataStream流式應用需要顯示調用execute()方法,否則Flink應用程序不會執行.但對於DataSet API 輸出算子已經包含對execute()方法的調用,不再需要顯示調用了,否則會出現程序異常.

//調StreamExecutionEnvironment的execute()方法來執行流式應用程序
env.execute("App Name")

總結

本文主要介紹了Flink應用程序開發的5步:獲取執行環境;初始化數據;執行轉換操作;分區key指定;輸出結果以及程序的觸發等開發模式以及內部的一些實現細節.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM