一.代碼實現
package cn.socket import org.apache.flink.streaming.api.scala._ // 數據類型異常,動態數據引入 // import org.apache.flink.api.scala._ // 數據類型異常,靜態數據引入 /** * Created by Administrator on 2020/3/22. */ object SocketWindowWordCount { def main(args: Array[String]) : Unit = { // 指定的IP和接口 val hostname: String = "192.168.136.7" val port: Int = 9001 // 獲取流處理環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 獲取socket計算數據 val text: DataStream[String] = env.socketTextStream(hostname, port, '\n') text.print() // wordcount val windowCounts = text .flatMap { w => w.split("\\s") } .map(w => (w, 1)) .keyBy(_._1) .sum(1) // 設置並行度,打印 windowCounts.print().setParallelism(1) // 執行 env.execute("Socket Window WordCount") } }
二.常見異常
Error:(15, 16) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String] .flatMap { w => w.split("\\s") } ^
Error:(15, 16) not enough arguments for method flatMap: (implicit evidence$11: org.apache.flink.api.common.typeinfo.TypeInformation[String])
org.apache.flink.streaming.api.scala.DataStream[String]. Unspecified value parameter evidence$11. .flatMap { w => w.split("\\s") } ^
如圖:
原因分析:
在flink中的大部分算子中,並沒有默認的隱式類型參數的定義,我們在使用時也沒有顯式地指定類型,因此會報類型異常。
三.解決方案
import org.apache.flink.streaming.api.scala._ // 數據類型異常,動態數據引入 import org.apache.flink.api.scala._ // 數據類型異常,靜態數據引入