使用scala開發本地測試的Spark WordCount程序


package com.yh.spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object WordCount {
  def main(args: Array[String]): Unit = {
    /**
     * 第一步:創建Spark的配置對象SparkConf,設置Spark程序運行時的配置信息,
     * 例如說通過設置setMaster來設置程序要鏈接的Spark集群的Master的URL,
     * 如果設置為local,則代表Spark程序在本地運行。
     */
    val conf = new SparkConf //創建SparkConf對象
    conf.setAppName("wordCount") //設置應用程序的名稱,在程序運行的監控界面可以看到名稱
    conf.setMaster("local") //此時,程序在本地運行,不需要安裝Spark集群
    
    /**
     * 第二步:創建SparkContext對象
     * SparkContext是Spark程序所有功能的唯一入口,無論是采用scala、java、Python,R等都
     * 必須有一個SparkContext。SparkContext核心作用:初始化Spark應用程序運行所需要的核心組件,包括
     * DAGScheduler,TaskScheduler、SchedulerBackend同時還會負責Spark程序往Master注冊程序等。
     * SparkContext是這個Spark程序中最為至關重要的一個對象。
     */
    val sc = new SparkContext(conf)
    
    /**
     * 第三步:根據具體的數據源(HDFS、HBase、Local FS、DB、S3等)通過SparkContext創建RDD。
     * RDD的創建方式有三種:根據外部的數據源(HDFS)、根據Scala集合、其他的RDD操作。數據會被RDD划分成一系列的
     * Partitions,分配到每個Partition的數據屬於一個Task的處理范疇
     */
    val lines = sc.textFile("D://data//1.txt", 1)//
    
    /**
     * 第四步:對初始化的RDD進行Transformation級別處理,例如map、filter等高階函數等的編程,來進行具體的數據計算。
     */
    /**
     * 4.1、對每一行的字符串拆分成單個的單詞
     */
    val words = lines.flatMap { line => line.split(" ") } //對每一行的字符串進行單詞拆分並把所有行的拆分結果通過flat合並成為一
    
    /**
     * 4.2、在單詞拆分的基礎上對每個單詞實例計數為1,也就是word => (word,1)
     */
    val pairs = words.map { word => (word, 1) }
    
    /**
     * 4.3、在每個單詞實例計數為1基礎之上統計每個單詞在文件中出現的總次數
     */
    val wordCounts = pairs.reduceByKey(_+_) //對相同的key,進行value的累計
    
    wordCounts.foreach(map => println(map._1 +":"+ map._2))
    
    sc.stop()
  }
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM