6、Flink批處理案例和流處理案例實現-scala


 

 

 

在pom.xml文件添加以下依賴

      <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

 

批處理案例

創建一個scala類

 

 

 

 

 

創建一個scala對象

 

package com.stuscala.batch import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ object WordCount { def main(args: Array[String]) { //批處理程序,需要創建ExecutionEnvironment
    val env = ExecutionEnvironment.getExecutionEnvironment //fromElements(elements:_*) --- 從一個給定的對象序列中創建一個數據流,所有的對象必須是相同類型的。
    val text = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?","hah") val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .groupBy(0)//根據第一個元素分組
      .sum(1) //打印
 counts.print() } }

 

 

 

流處理案例

1、安裝netcat工具,工具下載地址

https://eternallybored.org/misc/netcat/

2、解壓安裝包

3、將nc.exe 復制到C:\Windows\System32的文件夾下

4、打開cmd。輸入nc 命令OK~

 

新建一個scala類

 

 

 

 

 

package com.stuscala.stream

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._


object WordConutSacla {
  def main(args: Array[String]) {
    /*
    在Flink程序中首先需要創建一個StreamExecutionEnvironment
    (如果你在編寫的是批處理程序,需要創建ExecutionEnvironment),它被用來設置運行參數。
    當從外部系統讀取數據的時候,它也被用來創建源(sources)。
     */
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }//nonEmpty非空的
      .map { (_, 1) }
      .keyBy(0)//通過Tuple的第一個元素進行分組
      .timeWindow(Time.seconds(5))//Windows 根據某些特性將每個key的數據進行分組 (例如:在5秒內到達的數據).
      .sum(1)

    //將結果流在終端輸出
    counts.print
    //開始執行計算
    env.execute("Window Stream WordCount")
  }
}

 

 

 

 

打開cmd終端

輸入命令:nc -lL -p 9999 回車等待啟動程序

這個時候先運行idea里面的程序

 

 

 

 

 

 

 

 

在終端輸入一些單詞

 

 

 

可以在idea里面看到統計的結果

 

 

 

關閉cmd窗口后,idea的程序會自動退出

 

 

如果需要對scala代碼一起打包,就需要在pom.xml文件里面添加以下依賴了,否則打包的時候只打包java相關的

    <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <configuration>
                    <recompileMode>incremental</recompileMode>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

 

 

本文參考鏈接:https://blog.csdn.net/qichangjian/article/details/89152409

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM