在pom.xml文件添加以下依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
批處理案例
創建一個scala類
創建一個scala對象
package com.stuscala.batch import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ object WordCount { def main(args: Array[String]) { //批處理程序,需要創建ExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment //fromElements(elements:_*) --- 從一個給定的對象序列中創建一個數據流,所有的對象必須是相同類型的。
val text = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?","hah") val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .groupBy(0)//根據第一個元素分組
.sum(1) //打印
counts.print() } }
流處理案例
1、安裝netcat工具,工具下載地址
https://eternallybored.org/misc/netcat/
2、解壓安裝包
3、將nc.exe 復制到C:\Windows\System32的文件夾下
4、打開cmd。輸入nc 命令OK~
新建一個scala類
package com.stuscala.stream import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.scala._ object WordConutSacla { def main(args: Array[String]) { /* 在Flink程序中首先需要創建一個StreamExecutionEnvironment (如果你在編寫的是批處理程序,需要創建ExecutionEnvironment),它被用來設置運行參數。 當從外部系統讀取數據的時候,它也被用來創建源(sources)。 */ val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("localhost", 9999) val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }//nonEmpty非空的 .map { (_, 1) } .keyBy(0)//通過Tuple的第一個元素進行分組 .timeWindow(Time.seconds(5))//Windows 根據某些特性將每個key的數據進行分組 (例如:在5秒內到達的數據). .sum(1) //將結果流在終端輸出 counts.print //開始執行計算 env.execute("Window Stream WordCount") } }
打開cmd終端
輸入命令:nc -lL -p 9999 回車等待啟動程序
這個時候先運行idea里面的程序
在終端輸入一些單詞
可以在idea里面看到統計的結果
關閉cmd窗口后,idea的程序會自動退出
如果需要對scala代碼一起打包,就需要在pom.xml文件里面添加以下依賴了,否則打包的時候只打包java相關的
<plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <configuration> <recompileMode>incremental</recompileMode> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin>
本文參考鏈接:https://blog.csdn.net/qichangjian/article/details/89152409