spark的wordcount


在開發環境下實現第一個程序wordcount

1、下載和配置scala,注意不要下載2.13,在spark-core明確支持scala2.13前,使用2.12或者2.11比較好。

https://www.scala-lang.org/download/

2、windows環境下的scala配置,可選

 

 

 

 

 3、開發工具IDEA環境設置,全局環境添加scala的sdk,注意scala的源碼要手動下載和添加

 

 

 4、在IDEA中新建MAVEN項目,添加scala框架支持

 

 

 5、在MAVEN工程添加spark-core依賴,注意根據自己需要選擇對應的版本,版本不對很可能會出現運行期異常。

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>2.4.4</version>
        </dependency>
    </dependencies>

6、wordcount代碼

在項目根目錄(與src平級)中新建一個input目錄,里面放入需要統計詞頻的文本文件

package com.home.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    //獲取環境
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkWordCount")

    //獲取上下文
    val sc: SparkContext = new SparkContext(conf)

    //讀取每一行
    val lines: RDD[String] = sc.textFile("input")

    //扁平化,將每行數據拆分成單個詞(自定義業務邏輯)
    val words: RDD[String] = lines.flatMap(_.split(" "))

    //結構轉換,對每個詞獲得初始詞頻
    val wordToOne: RDD[(String, Int)] = words.map((_,1))

    //詞頻計數
    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)

    //按詞頻數量降序排序
    val wordToSorted: RDD[(String, Int)] = wordToSum.sortBy(_._2,false)

    //數據輸出
    val result: Array[(String, Int)] = wordToSorted.collect()

    //打印
    result.foreach(println)

    //關閉上下文
    sc.stop()
  }
}

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM