package com.yh.spark import org.apache.spark.SparkConf import org.apache.spark.SparkContext object WordCount { def main(args: Array[String]): Unit = { /** * 第一步:創建Spark的配置對象SparkConf,設置Spark程序運行時的配置信息, * 例如說通過設置setMaster來設置程序要鏈接的Spark集群的Master的URL, * 如果設置為local,則代表Spark程序在本地運行。 */ val conf = new SparkConf //創建SparkConf對象 conf.setAppName("wordCount") //設置應用程序的名稱,在程序運行的監控界面可以看到名稱 conf.setMaster("local") //此時,程序在本地運行,不需要安裝Spark集群 /** * 第二步:創建SparkContext對象 * SparkContext是Spark程序所有功能的唯一入口,無論是采用scala、java、Python,R等都 * 必須有一個SparkContext。SparkContext核心作用:初始化Spark應用程序運行所需要的核心組件,包括 * DAGScheduler,TaskScheduler、SchedulerBackend同時還會負責Spark程序往Master注冊程序等。 * SparkContext是這個Spark程序中最為至關重要的一個對象。 */ val sc = new SparkContext(conf) /** * 第三步:根據具體的數據源(HDFS、HBase、Local FS、DB、S3等)通過SparkContext創建RDD。 * RDD的創建方式有三種:根據外部的數據源(HDFS)、根據Scala集合、其他的RDD操作。數據會被RDD划分成一系列的 * Partitions,分配到每個Partition的數據屬於一個Task的處理范疇 */ val lines = sc.textFile("D://data//1.txt", 1)// /** * 第四步:對初始化的RDD進行Transformation級別處理,例如map、filter等高階函數等的編程,來進行具體的數據計算。 */ /** * 4.1、對每一行的字符串拆分成單個的單詞 */ val words = lines.flatMap { line => line.split(" ") } //對每一行的字符串進行單詞拆分並把所有行的拆分結果通過flat合並成為一 /** * 4.2、在單詞拆分的基礎上對每個單詞實例計數為1,也就是word => (word,1) */ val pairs = words.map { word => (word, 1) } /** * 4.3、在每個單詞實例計數為1基礎之上統計每個單詞在文件中出現的總次數 */ val wordCounts = pairs.reduceByKey(_+_) //對相同的key,進行value的累計 wordCounts.foreach(map => println(map._1 +":"+ map._2)) sc.stop() } }