Flink學習(三) 批流版本的wordcount Scala版本


批處理代碼:

package com.wyh.wc

import org.apache.flink.api.scala._

/**
  * 批處理代碼
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    //創建一個批處理的一個環境
    val env = ExecutionEnvironment.getExecutionEnvironment

    val inputPath = "D:\\shujia\\shujia006\\FlinkWyh\\src\\main\\data\\word"

    val inputDataSet = env.readTextFile(inputPath)

    //分詞之后做count
    val wordcountSet = inputDataSet
      .flatMap(lines => lines.split(" "))
      .map((_, 1))
      .groupBy(0)
      .sum(1)

    //打印
    wordcountSet.map(x => {
      x._1 + " " + x._2
    }).print()


  }

}

 

流處理代碼:

package com.wyh.wc

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    //創建一個流處理的執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //為了host和port不寫死,flink提供了一個方法
    val params = ParameterTool.fromArgs(args)

//    val host = params.get("host")
//
//    val port = params.getInt("port")

    //env.disableOperatorChaining()//全局打散  一個算子一個任務
    //每一個算子也會有個方法  .disableChaining() 將這個算子單獨拿出來
    //還有個方法.startNewChain() 將當前算子之前面和后面 分開

    //部署到集群中接收socket數據流
//    val dataStream: DataStream[String] = env.socketTextStream(host, port)

    //接收socket數據流
    val dataStream = env.socketTextStream("localhost", 9999)

    //逐一讀取數據,打散進行WordCount
    val wordCountStream = dataStream.flatMap(_.split("\\s"))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    wordCountStream.print().setParallelism(1)


    //比批處理多一個步驟
    //真正執行這個任務,啟動它的Executor
    env.execute("WordCountStream")


  }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM