批處理代碼:
package com.wyh.wc import org.apache.flink.api.scala._ /** * 批處理代碼 */ object WordCount { def main(args: Array[String]): Unit = { //創建一個批處理的一個環境 val env = ExecutionEnvironment.getExecutionEnvironment val inputPath = "D:\\shujia\\shujia006\\FlinkWyh\\src\\main\\data\\word" val inputDataSet = env.readTextFile(inputPath) //分詞之后做count val wordcountSet = inputDataSet .flatMap(lines => lines.split(" ")) .map((_, 1)) .groupBy(0) .sum(1) //打印 wordcountSet.map(x => { x._1 + " " + x._2 }).print() } }
流處理代碼:
package com.wyh.wc import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ object StreamWordCount { def main(args: Array[String]): Unit = { //創建一個流處理的執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment //為了host和port不寫死,flink提供了一個方法 val params = ParameterTool.fromArgs(args) // val host = params.get("host") // // val port = params.getInt("port") //env.disableOperatorChaining()//全局打散 一個算子一個任務 //每一個算子也會有個方法 .disableChaining() 將這個算子單獨拿出來 //還有個方法.startNewChain() 將當前算子之前面和后面 分開 //部署到集群中接收socket數據流 // val dataStream: DataStream[String] = env.socketTextStream(host, port) //接收socket數據流 val dataStream = env.socketTextStream("localhost", 9999) //逐一讀取數據,打散進行WordCount val wordCountStream = dataStream.flatMap(_.split("\\s")) .filter(_.nonEmpty) .map((_, 1)) .keyBy(0) .sum(1) wordCountStream.print().setParallelism(1) //比批處理多一個步驟 //真正執行這個任務,啟動它的Executor env.execute("WordCountStream") } }