Flink Socket WordCount常見異常及解決方案


一.代碼實現 

package cn.socket

import org.apache.flink.streaming.api.scala._ // 數據類型異常,動態數據引入
// import org.apache.flink.api.scala._ // 數據類型異常,靜態數據引入

/**
  * Created by Administrator on 2020/3/22.
  */

object SocketWindowWordCount {
  def main(args: Array[String]) : Unit = {

    // 指定的IP和接口
    val hostname: String = "192.168.136.7"
    val port: Int = 9001

    // 獲取流處理環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 獲取socket計算數據
    val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')

    text.print()
    // wordcount
    val windowCounts = text
      .flatMap { w => w.split("\\s") }
      .map(w => (w, 1))
      .keyBy(_._1)
      .sum(1)

    // 設置並行度,打印
    windowCounts.print().setParallelism(1)

    // 執行
    env.execute("Socket Window WordCount")
  }
}

二.常見異常

Error:(15, 16) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
      .flatMap { w => w.split("\\s") }
               ^
Error:(15, 16) not enough arguments for method flatMap: (implicit evidence$11: org.apache.flink.api.common.typeinfo.TypeInformation[String])
org.apache.flink.streaming.api.scala.DataStream[String]. Unspecified value parameter evidence$
11. .flatMap { w => w.split("\\s") } ^

  如圖:

  

  原因分析:

    在flink中的大部分算子中,並沒有默認的隱式類型參數的定義,我們在使用時也沒有顯式地指定類型,因此會報類型異常。

三.解決方案

import org.apache.flink.streaming.api.scala._ // 數據類型異常,動態數據引入
import org.apache.flink.api.scala._ // 數據類型異常,靜態數據引入


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM