Flink為大數據處理工具,類似hadoop,spark.但它能夠在大規模分布式系統中快速處理,與spark相似也是基於內存運算,並以低延遲性和高容錯性主城,其核心特性是實時的處理流數據。從此大數據生態圈又再填一員。。。具體詳解,還要等之后再分享,這里就先簡要帶過~
Flink的機制:
當Flink啟動時,會拉起一個jobmanager和一個或多個taskManager,jobmanager作用就好比spark中的driver,taskManager的作用就好比spark中的worker.
flink源碼:http://www.apache.org/dyn/closer.lua/flink/flink-0.10.1/flink-0.10.1-src.tgz
下載與hadoop2.6兼容版本:http://apache.dataguru.cn/flink/flink-0.10.1/flink-0.10.1-bin-hadoop26-scala_2.10.tgz
下載完畢后確定確定配置了jdk
java -version
執行 bin/start-local.sh 啟動local模式 (conf下默認配置的是localhost 其他參數暫且不必配置)
bin/start-local.sh
tail log/flink-*-jobmanager-*.log
隨后可以導入idea 進行wordcount測試 ,這里用官網的example包,記得導入
package test import org.apache.flink.api.scala._ import org.apache.flink.examples.java.wordcount.util.WordCountData /** * Created by root on 12/15/15. */ object WordCount { def main(args: Array[String]) { if (!parseParameters(args)) { return } val env = ExecutionEnvironment.getExecutionEnvironment val text = getTextDataSet(env) val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .groupBy(0) .sum(1) if (fileOutput) { counts.writeAsCsv(outputPath, "\n", " ") env.execute("Scala WordCount Example") } else { counts.print() } } private def parseParameters(args: Array[String]): Boolean = { if (args.length > 0) { fileOutput = true if (args.length == 2) { textPath = args(0) outputPath = args(1) true } else { System.err.println("Usage: WordCount <text path> <result path>") false } } else { System.out.println("Executing WordCount example with built-in default data.") System.out.println(" Provide parameters to read input data from a file.") System.out.println(" Usage: WordCount <text path> <result path>") true } } private def getTextDataSet(env: ExecutionEnvironment): DataSet[String] = { if (fileOutput) { env.readTextFile(textPath) } else { env.fromCollection(WordCountData.WORDS) }
運行一下子: