Flink程序遵循一定的編程模式。DataStream API 和 DataSet API 基本具有相同的程序結構。以下為一個流式程序的示例代碼來對文本文件進行詞頻統計。
package com.realtime.flink.streaming
import org.apache.flink.apijava.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
object WordCount {
def main(args: Array[String]) {
//第一步:設定執行環境
val env = SreamExecutionEnvironment.getExecutionEnvironment
//第二步:指定數據源地址,開始讀取數據
val text = env.readTextFile("file:///path/file")
//第三步:對數據集指定轉換操作邏輯
val counts : DataStream[(String, int)] = text
.flatMap(_.toLowerCase.split(" "))
.fliter(_.nonEmpty)
.map(_, 1)
.sum(1)
//第四步:指定計算結果輸出位置
if (params.has("output")) {
counts.writeAsText(params.get("output"))
} else {
println("Printing resule to stdout. Use --output to specify output path.")
counts.print()
}
//第五步:指定名稱並觸發流式任務
env.execute("Streaming WordCount")
}
}
整個Flink 程序一共分為5步:
1. Flink執行環境
不同的執行環境決定了應用的類型:
StreamExecutionEnvironmen用來流式處理,ExecutionEnvironment是批量數據處理環境.
獲取環境的三種方式:
-
流處理:
//設定Flink運行環境,如果在本地啟動則創建本地環境,如果在集群啟動就創建集群環境 StreamExecutionEnvironment.getExecutionEnvironment //指定並行度創建本地執行環境 StreamExecutionEnvironment.createLocalEnvironment(5) //指定遠程JobManager ip和RPC 端口以及運行程序所在的jar包和及其依賴包 StreamExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "/user/application.jar")
第三種方式直接從本地代碼創建與遠程集群的JobManager的RPC連接,指定jar將運行程序遠程拷貝到JobManager節點上,Flink應用程序運行在遠程的環境中,本地程序相當於一個客戶端.
-
批處理:
//設定Flink運行環境,如果在本地啟動則創建本地環境,如果在集群啟動就創建集群環境
ExecutionEnvironment.getExecutionEnvironment
//指定並行度創建本地執行環境
ExecutionEnvironment.createLocalEnvironment(5)
//指定遠程JobManager ip和RPC 端口以及運行程序所在的jar包和及其依賴包
ExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "/user/application.jar")
注意不同的語言開發Flink應用的時候需要引入不同環境對應的執行環境
2. 初始化數據
-
創建完執行環境, ExecutionEnvironment 需要提供不同的數據接入接口完成數據初始化,將外部數據轉換成DataStream
或DataSet 數據集. -
Flink提供了多種從外部讀取數據的連接器,包括批量和實時的數據連接器,能夠將Flink系統與其他第三方系統進行連接,直接獲取外部數據
-
以下代碼通過readTextFile()方法讀取flle://pathfile路徑中的數據並轉換成DataStream
數據集.
val text: DataStream[String] = env.readTextFlie("flle://pathfile")
讀取文件轉換為DataStream[String]數據集,完成了從本地文件到分布式數據集的轉換
3. 執行轉換操作
對數據集的各種Transformation操作通過不同的Operator來實現,每個Operator來實現,每個Operator內部通過實現Function接口完成數據處理邏輯的定義.
DataStream API 和 DataSet API 提供了很多轉換算子, 如: map, flatMap, filter, keyBy, 用戶只需要定義每個算子執行的函數邏輯,然后應用在數據轉換操作Operator 接口即可.
val counts: DataStream[String, Int] = text
.flatMap(_.toLowerCase.split(" ")) //執行flatMap操作
.filter(_.nonEmpty) //過濾空字段
.map((_, 1) //執行map轉換操作,轉換成key - value 接口
.keyBy(0) // 按照指定key對數據重分區
.sum(1) /執行求和運算操作
flink 定義Function的計算邏輯可以通過以下幾種方式完成定義:
1. 通過創建Class 實現Function接口
//實現MapFunction接口
class MyMapFunction extends MapFunction[String, String] {
override def map(t: String): String {
t.toUpperCase()
}
}
val dataStream: DataStream[String] = env.fromElements("hello", flink)
//將MyMapFunction實現類傳入進去
dataStream.map(new MyMapFunction)
完成對實現將數據集中的字符串轉換成大寫的數據處理
2. 通過創建匿名類實現Function接口
val dataStream: DataStream[String] = env.fromElements("hello", flink)
//通過創建MapFunction匿名實現類來定義map函數的計算邏輯
dataStream.map(new MapFunction[String, String] {
//實現對輸入字符串大寫轉換
override def map(t: String): String{
t.toUpperCase()
}
})
3. 通過實現RichFunction接口
Flink提供了RichFunction接口,用於比較高級的數據處理場景,RichFunction接口中有open、close、getRuntimeContext 以及setRuntimeContext來獲取狀態、緩存等系統內部數據. 與MapFunction類似,RichFunction子類也有RichMapFunction.
//定義匿名類實現RichMapFunction接口,完成對字符串到整形數字的轉換
dataStream.map(new RichMapFunction[String, Int] {
//實現對輸入字符串大寫轉換
override def map(in: String):Int = (in.toInt)
})
4.分區key指定
某些算子需要指定的key進行轉換,常見的算子有: join 、coGroup、groupBy.需要將DataStream或DataSet數據集轉換成對應KeyedStream 和GroupDataSet ,主要是將相同key的數據路由到相同的Pipeline中
1.根據字段位置指定
//DataStream API聚合計算
val dataStream : DataStream[(String,Int)] = env.fromElements(("a", 1),("c", 2))
//根據第一個字段重新分區,然后對第二個字段進行求和計算
val result = dataStream.keyBy(0).sum(1)
//DataSet API 聚合計算
val dataSet = env.fromElements(("a", 1),("c", 2))
//根據第一個字段進行數據重分區
val groupDataSet : GroupDataSet[(String , Int)] = dataSet.groupBy(0)
//求取相同key值第二個字段的最大值
groupDataSet.max(1)
2.根據字段名稱指定
使用字段名稱需要DataStream 中的數據結構類型必須是Tuple類或者POJOs類
val personDataSet = env.fromElements(new Person("Alex", 18), new Person("Peter", 43))
//指定name字段名稱來確定groupBy 字段
personDataSet.groupBy("name").max(1)
如果程序中使用Tuple數據類型,通常情況下字段名稱從1開始計算,字段位置索引從0開始計算
val personDataStream = env.fromElements(new Person("Alex", 18), new Person("Peter", 43))
//通過名稱指定第一個字段
personDataStream.keyBy("_1")
//通過位置指定第一個字段
personDataStream.keyBy(0)
使用嵌套的復雜數據結構:
class NestedClass {
var id: int,
tuples: (Long, Long, String)){
def this() {
this(0, (0, 0, " "))
}
}
class CompelexClass(var nested: NestedClass, var tag: String) {
def this() {
this(null, " ")
}
}
通過“nested”獲取整個NestedClass對象所有字段,調用“tag”獲取 CompelexClass中tag字段,調用“nested.id”獲取NestedClass的id字段,調用“nested.tuples._1”獲取NestedClass中tuple元祖第一個字段
3. 通過Key選擇器指定
定義KeySelector,然后復寫getKey方法,從Person對象中獲取name為指定的Key.
case class Person(name: String, age: Int)
var person = env.fromElements(Person("hello", 1), Person("Flink", 3) )
//
val keyed: KeyedStream[WC] = person.keyBy(new KeySelector[Person, String](){
override def getKey(person: Person): String = person.name
})
5.輸出結果
數據進行轉換操作之后,一般會輸出到外部系統或者控制台上.Flink 除了基本的數據輸出方法,在系統中還定義了很多Connector,用戶通過調用addSink()添加輸出系統定義的DataSink類算子,這樣就可以將數據輸出到外部系統.
//將數據輸出到文件中
counts.writeAsText("file://path/to/savefile")
//將數據輸出控制台
counts.print()
程序觸發
計算邏輯全部操作定義好后,需要調ExecutionEnvironment的execute()方法來觸發程序的執行,execute()方法返回的結果類型為JobExecutionResult,JobExecutionResult包含了程序執行的時間和累加器等指標.
注意: DataStream流式應用需要顯示調用execute()方法,否則Flink應用程序不會執行.但對於DataSet API 輸出算子已經包含對execute()方法的調用,不再需要顯示調用了,否則會出現程序異常.
//調StreamExecutionEnvironment的execute()方法來執行流式應用程序
env.execute("App Name")
總結
本文主要介紹了Flink應用程序開發的5步:獲取執行環境;初始化數據;執行轉換操作;分區key指定;輸出結果以及程序的觸發等開發模式以及內部的一些實現細節.