flink初體驗-批處理與流處理


一、環境准備

本機環境:jdk11、scala2.12、maven3.6

新建一個maven項目,pom如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.wzy</groupId>
    <artifactId>flink-project</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.10.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.10.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.10.2</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <!-- 聲明綁定到 maven 的 compile 階段 -->
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

項目結構如下

1、添加scala源文件,新建一個scala的文件夾,並把它設置成源文件。設置方法 File -> Project Structure -> Modules
2、添加scala框架支持,右鍵項目 -> Add Framework Support -> scala(需要提前配置上scala的sdk)

 

二、wordcount批處理

新建一個txt文件,讓程序讀取文件里內容進行單詞的統計

package wordcount

import org.apache.flink.api.scala._

/**
 * @description 批處理
 */
object WordCountBatch {

  def main(args: Array[String]): Unit = {
    // 創建執行環境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 從文件中讀取數據
    val inputPath = "src/main/resources/wordcount.txt"
    val inputDS: DataSet[String] = env.readTextFile(inputPath)

    // 分詞之后, 對單詞進行 groupby 分組, 然后用 sum 進行聚合
    val wordCountDS: AggregateDataSet[(String, Int)] = inputDS
      .flatMap(_.split(" "))
      .map((_, 1))
      .groupBy(0)
      .sum(1)

    // 打印輸出
    wordCountDS.print()
  }

}

打印結果如下

三、wordcount流處理

新建一個服務端口,讓程序監聽端口,收到一條處理一條,結果累加

開啟臨時服務命令:nc -lk 8088

手動輸入單詞回車就會發送出去

然后程序進行端口的監聽

package wordcount

import org.apache.flink.streaming.api.scala._

/**
 * @description 流處理
 */
object WordCountStream {

  def main(args: Array[String]): Unit = {

    val host: String = "localhost"
    val port: Int = 8088

    // 創建流處理環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 接收 socket 文本流
    val textDstream: DataStream[String] = env.socketTextStream(host, port)

    // flatMap 和 Map 需要引用的隱式轉換
    val dataStream: DataStream[(String, Int)] = textDstream
      .flatMap(_.split(" "))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    dataStream.print()
    env.execute("wordcount job")
  }
}

打印結果如下,可以看到結果是累加的

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM