Flink單機版安裝與wordCount


Flink為大數據處理工具,類似hadoop,spark.但它能夠在大規模分布式系統中快速處理,與spark相似也是基於內存運算,並以低延遲性和高容錯性主城,其核心特性是實時的處理流數據。從此大數據生態圈又再填一員。。。具體詳解,還要等之后再分享,這里就先簡要帶過~

 

 

Flink的機制:

當Flink啟動時,會拉起一個jobmanager和一個或多個taskManager,jobmanager作用就好比spark中的driver,taskManager的作用就好比spark中的worker.

 

flink源碼:http://www.apache.org/dyn/closer.lua/flink/flink-0.10.1/flink-0.10.1-src.tgz

下載與hadoop2.6兼容版本:http://apache.dataguru.cn/flink/flink-0.10.1/flink-0.10.1-bin-hadoop26-scala_2.10.tgz

下載完畢后確定確定配置了jdk

java -version

執行 bin/start-local.sh 啟動local模式 (conf下默認配置的是localhost 其他參數暫且不必配置)

 bin/start-local.sh
tail log/flink-*-jobmanager-*.log

 

 

隨后可以導入idea 進行wordcount測試 ,這里用官網的example包,記得導入

package test

import org.apache.flink.api.scala._
import org.apache.flink.examples.java.wordcount.util.WordCountData

/**
 * Created by root on 12/15/15.
 */
object WordCount {
  def main(args: Array[String]) {
    if (!parseParameters(args)) {
      return
    }

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = getTextDataSet(env)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    if (fileOutput) {
      counts.writeAsCsv(outputPath, "\n", " ")
      env.execute("Scala WordCount Example")
    } else {
      counts.print()
    }

  }

  private def parseParameters(args: Array[String]): Boolean = {
    if (args.length > 0) {
      fileOutput = true
      if (args.length == 2) {
        textPath = args(0)
        outputPath = args(1)
        true
      } else {
        System.err.println("Usage: WordCount <text path> <result path>")
        false
      }
    } else {
      System.out.println("Executing WordCount example with built-in default data.")
      System.out.println("  Provide parameters to read input data from a file.")
      System.out.println("  Usage: WordCount <text path> <result path>")
      true
    }
  }

  private def getTextDataSet(env: ExecutionEnvironment): DataSet[String] = {
    if (fileOutput) {
      env.readTextFile(textPath)
    }
    else {
      env.fromCollection(WordCountData.WORDS)
    }

運行一下子:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM