一、安裝 Scala 插件
Flink 分別提供了基於 Java 語言和 Scala 語言的 API ,如果想要使用 Scala 語言來開發 Flink 程序,可以通過在 IDEA 中安裝 Scala 插件來提供語法提示,代碼高亮等功能。打開 IDEA , 依次點擊 File => settings => plugins
打開插件安裝頁面,搜索 Scala 插件並進行安裝,安裝完成后,重啟 IDEA 即可生效。
二、Flink 項目初始化
2.1 使用官方腳本構建
Flink 官方支持使用 Maven 和 Gradle 兩種構建工具來構建基於 Java 語言的 Flink 項目;支持使用 SBT 和 Maven 兩種構建工具來構建基於 Scala 語言的 Flink 項目。 這里以 Maven 為例進行說明,因為其可以同時支持 Java 語言和 Scala 語言項目的構建。需要注意的是 Flink 1.9 只支持 Maven 3.0.4 以上的版本,Maven 安裝完成后,可以通過以下兩種方式來構建項目:
1. 直接基於 Maven Archetype 構建
直接使用下面的 mvn 語句來進行構建,然后根據交互信息的提示,依次輸入 groupId , artifactId 以及包名等信息后等待初始化的完成:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.0
注:如果想要創建基於 Scala 語言的項目,只需要將 flink-quickstart-java 換成 flink-quickstart-scala 即可,后文亦同。
2. 使用官方腳本快速構建
為了更方便的初始化項目,官方提供了快速構建腳本,可以直接通過以下命令來進行調用:
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.0
該方式其實也是通過執行 maven archetype 命令來進行初始化,其腳本內容如下:
PACKAGE=quickstart
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=${1:-1.8.0} \
-DgroupId=org.myorg.quickstart \
-DartifactId=$PACKAGE \
-Dversion=0.1 \
-Dpackage=org.myorg.quickstart \
-DinteractiveMode=false
可以看到相比於第一種方式,該種方式只是直接指定好了 groupId ,artifactId ,version 等信息而已。
2.2 使用 IDEA 構建
如果你使用的是開發工具是 IDEA ,可以直接在項目創建頁面選擇 Maven Flink Archetype 進行項目初始化:
如果你的 IDEA 沒有上述 Archetype, 可以通過點擊右上角的 ADD ARCHETYPE
,來進行添加,依次填入所需信息,這些信息都可以從上述的 archetype:generate
語句中獲取。點擊 OK
保存后,該 Archetype 就會一直存在於你的 IDEA 中,之后每次創建項目時,只需要直接選擇該 Archetype 即可:
選中 Flink Archetype ,然后點擊 NEXT
按鈕,之后的所有步驟都和正常的 Maven 工程相同。
三、項目結構
3.1 項目結構
創建完成后的自動生成的項目結構如下:
其中 BatchJob 為批處理的樣例代碼,源碼如下:
import org.apache.flink.api.scala._
object BatchJob {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
....
env.execute("Flink Batch Scala API Skeleton")
}
}
getExecutionEnvironment 代表獲取批處理的執行環境,如果是本地運行則獲取到的就是本地的執行環境;如果在集群上運行,得到的就是集群的執行環境。如果想要獲取流處理的執行環境,則只需要將 ExecutionEnvironment
替換為 StreamExecutionEnvironment
, 對應的代碼樣例在 StreamingJob 中:
import org.apache.flink.streaming.api.scala._
object StreamingJob {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
...
env.execute("Flink Streaming Scala API Skeleton")
}
}
需要注意的是對於流處理項目 env.execute()
這句代碼是必須的,否則流處理程序就不會被執行,但是對於批處理項目則是可選的。
3.2 主要依賴
基於 Maven 骨架創建的項目主要提供了以下核心依賴:其中 flink-scala
用於支持開發批處理程序 ;flink-streaming-scala
用於支持開發流處理程序 ;scala-library
用於提供 Scala 語言所需要的類庫。如果在使用 Maven 骨架創建時選擇的是 Java 語言,則默認提供的則是 flink-java
和 flink-streaming-java
依賴。
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
需要特別注意的以上依賴的 scope
標簽全部被標識為 provided ,這意味着這些依賴都不會被打入最終的 JAR 包。因為 Flink 的安裝包中已經提供了這些依賴,位於其 lib 目錄下,名為 flink-dist_*.jar
,它包含了 Flink 的所有核心類和依賴:
scope
標簽被標識為 provided 會導致你在 IDEA 中啟動項目時會拋出 ClassNotFoundException 異常。基於這個原因,在使用 IDEA 創建項目時還自動生成了以下 profile 配置:
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
在 id 為 add-dependencies-for-IDEA
的 profile 中,所有的核心依賴都被標識為 compile,此時你可以無需改動任何代碼,只需要在 IDEA 的 Maven 面板中勾選該 profile,即可直接在 IDEA 中運行 Flink 項目:
四、詞頻統計案例
項目創建完成后,可以先書寫一個簡單的詞頻統計的案例來嘗試運行 Flink 項目,以下以 Scala 語言為例,分別介紹流處理程序和批處理程序的編程示例:
4.1 批處理示例
import org.apache.flink.api.scala._
object WordCountBatch {
def main(args: Array[String]): Unit = {
val benv = ExecutionEnvironment.getExecutionEnvironment
val dataSet = benv.readTextFile("D:\\wordcount.txt")
dataSet.flatMap { _.toLowerCase.split(",")}
.filter (_.nonEmpty)
.map { (_, 1) }
.groupBy(0)
.sum(1)
.print()
}
}
其中 wordcount.txt
中的內容如下:
a,a,a,a,a
b,b,b
c,c
d,d
本機不需要配置其他任何的 Flink 環境,直接運行 Main 方法即可,結果如下:
4.2 流處理示例
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WordCountStreaming {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = senv.socketTextStream("192.168.0.229", 9999, '\n')
dataStream.flatMap { line => line.toLowerCase.split(",") }
.filter(_.nonEmpty)
.map { word => (word, 1) }
.keyBy(0)
.timeWindow(Time.seconds(3))
.sum(1)
.print()
senv.execute("Streaming WordCount")
}
}
這里以監聽指定端口號上的內容為例,使用以下命令來開啟端口服務:
nc -lk 9999
之后輸入測試數據即可觀察到流處理程序的處理情況。
五、使用 Scala Shell
對於日常的 Demo 項目,如果你不想頻繁地啟動 IDEA 來觀察測試結果,可以像 Spark 一樣,直接使用 Scala Shell 來運行程序,這對於日常的學習來說,效果更加直觀,也更省時。Flink 安裝包的下載地址如下:
https://flink.apache.org/downloads.html
Flink 大多數版本都提供有 Scala 2.11 和 Scala 2.12 兩個版本的安裝包可供下載:
下載完成后進行解壓即可,Scala Shell 位於安裝目錄的 bin 目錄下,直接使用以下命令即可以本地模式啟動:
./start-scala-shell.sh local
命令行啟動完成后,其已經提供了批處理 (benv 和 btenv)和流處理(senv 和 stenv)的運行環境,可以直接運行 Scala Flink 程序,示例如下:
最后解釋一個常見的異常:這里我使用的 Flink 版本為 1.9.1,啟動時會拋出如下異常。這里因為按照官方的說明,目前所有 Scala 2.12 版本的安裝包暫時都不支持 Scala Shell,所以如果想要使用 Scala Shell,只能選擇 Scala 2.11 版本的安裝包。
[root@hadoop001 bin]# ./start-scala-shell.sh local
錯誤: 找不到或無法加載主類 org.apache.flink.api.scala.FlinkShell