Spark入門1(以WordCount為例講解flatmap和map之間的區別)


 1 package com.test
 2 
 3 
 4 import org.apache.spark.{SparkConf, SparkContext}
 5 
 6 
 7 object WordCount {
 8   def main(args: Array[String]) {
 9     /**
10       * 第1步;創建Spark的配置對象SparkConf,設置Spark程序運行時的配置信息
11       * 例如 setAppName用來設置應用程序的名稱,在程序運行的監控界面可以看到該名稱,
12       * setMaster設置程序運行在本地還是運行在集群中,運行在本地可是使用local參數,也可以使用local[K]/local[*],
13       * 可以去spark官網查看它們不同的意義。 如果要運行在集群中,以Standalone模式運行的話,需要使用spark://HOST:PORT
14       * 的形式指定master的IP和端口號,默認是7077
15       */
16     val conf = new SparkConf().setAppName("WordCount").setMaster("local")
17     //  val conf = new SparkConf().setAppName("WordCount").setMaster("spark://master:7077")  // 運行在集群中
18 
19     /**
20       * 第2步:創建SparkContext 對象
21       * SparkContext是Spark程序所有功能的唯一入口
22       * SparkContext核心作用: 初始化Spark應用程序運行所需要的核心組件,包括DAGScheduler、TaskScheduler、SchedulerBackend
23       * 同時還會負責Spark程序往Master注冊程序
24       *
25       * 通過傳入SparkConf實例來定制Spark運行的具體參數和配置信息
26       */
27     val sc = new SparkContext(conf)
28 
29     /**
30       * 第3步: 根據具體的數據來源(HDFS、 HBase、Local FS、DB、 S3等)通過SparkContext來創建RDD
31       * RDD 的創建基本有三種方式: 根據外部的數據來源(例如HDFS)、根據Scala集合使用SparkContext的parallelize方法、
32       * 由其他的RDD操作產生
33       * 數據會被RDD划分成為一系列的Partitions,分配到每個Partition的數據屬於一個Task的處理范疇
34       */
35 
36     val lines = sc.textFile("D:/wordCount.txt")   // 讀取本地文件
37     //  val lines = sc.textFile("/library/wordcount/input")   // 讀取HDFS文件,並切分成不同的Partition
38     //  val lines = sc.textFile("hdfs://master:9000/libarary/wordcount/input")  // 或者明確指明是從HDFS上獲取數據
39 
40     /**
41       * 第4步: 對初始的RDD進行Transformation級別的處理,例如 map、filter等高階函數來進行具體的數據計算
42       */
43     val words = lines.flatMap(_.split(" ")).filter(word => word != " ")  // 拆分單詞,並過濾掉空格,當然還可以繼續進行過濾,如去掉標點符號
44 
45     val pairs = words.map(word => (word, 1))  // 在單詞拆分的基礎上對每個單詞實例計數為1, 也就是 word => (word, 1)
46 
47     val wordscount = pairs.reduceByKey(_ + _)  // 在每個單詞實例計數為1的基礎之上統計每個單詞在文件中出現的總次數, 即key相同的value相加
48     //  val wordscount = pairs.reduceByKey((v1, v2) => v1 + v2)  // 等同於
49 
50     wordscount.collect.foreach(println)  // 打印結果,使用collect會將集群中的數據收集到當前運行drive的機器上,需要保證單台機器能放得下所有數據
51 
52     sc.stop()   // 釋放資源
53 
54   }
55 }
 1 package com.test
 2 
 3 
 4 import org.apache.spark.{SparkConf, SparkContext}
 5 
 6 
 7 object WordCount {
 8   def main(args: Array[String]) {
 9     /**
10       * 第1步;創建Spark的配置對象SparkConf,設置Spark程序運行時的配置信息
11       * 例如 setAppName用來設置應用程序的名稱,在程序運行的監控界面可以看到該名稱,
12       * setMaster設置程序運行在本地還是運行在集群中,運行在本地可是使用local參數,也可以使用local[K]/local[*],
13       * 可以去spark官網查看它們不同的意義。 如果要運行在集群中,以Standalone模式運行的話,需要使用spark://HOST:PORT
14       * 的形式指定master的IP和端口號,默認是7077
15       */
16     val conf = new SparkConf().setAppName("WordCount").setMaster("local")
17     //  val conf = new SparkConf().setAppName("WordCount").setMaster("spark://master:7077")  // 運行在集群中
18 
19     /**
20       * 第2步:創建SparkContext 對象
21       * SparkContext是Spark程序所有功能的唯一入口
22       * SparkContext核心作用: 初始化Spark應用程序運行所需要的核心組件,包括DAGScheduler、TaskScheduler、SchedulerBackend
23       * 同時還會負責Spark程序往Master注冊程序
24       *
25       * 通過傳入SparkConf實例來定制Spark運行的具體參數和配置信息
26       */
27     val sc = new SparkContext(conf)
28 
29     /**
30       * 第3步: 根據具體的數據來源(HDFS、 HBase、Local FS、DB、 S3等)通過SparkContext來創建RDD
31       * RDD 的創建基本有三種方式: 根據外部的數據來源(例如HDFS)、根據Scala集合使用SparkContext的parallelize方法、
32       * 由其他的RDD操作產生
33       * 數據會被RDD划分成為一系列的Partitions,分配到每個Partition的數據屬於一個Task的處理范疇
34       */
35 
36     val lines = sc.textFile("D:/data/kddcup.data_10_percent_corrected")   // 讀取本地文件
37     //  val lines = sc.textFile("/library/wordcount/input")   // 讀取HDFS文件,並切分成不同的Partition
38     //  val lines = sc.textFile("hdfs://master:9000/libarary/wordcount/input")  // 或者明確指明是從HDFS上獲取數據
39 
40     /**
41       * 第4步: 對初始的RDD進行Transformation級別的處理,例如 map、filter等高階函數來進行具體的數據計算
42       */
43     println("words")
44     //val words = lines.flatMap(_.split(" ")) // flatMap是將整個lines文件中的字母做拆分,返回的是一整個拆分后的list
45 
46 
47     val pairs = lines.map(word => (word.split(",")(41), 1))  // Map是按行拆分,找到每行的第41個,實例計數為1,返回的是一個大list里面套了小的list
48 
49     val wordscount = pairs.reduceByKey(_ + _)  // 在每個單詞實例計數為1的基礎之上統計每個單詞在文件中出現的總次數, 即key相同的value相加
50     //  val wordscount = pairs.reduceByKey((v1, v2) => v1 + v2)  // 等同於
51 
52     wordscount.collect.foreach(println)  // 打印結果,使用collect會將集群中的數據收集到當前運行drive的機器上,需要保證單台機器能放得下所有數據
53 
54     sc.stop()   // 釋放資源
55 
56   }
57 }

  博客中有兩段很長的代碼,我們重點關注第一段的43行和第二段的47行,我們可以看到第一段用了flatmap而第二段用了map。那這之間有什么區別呢?

  第一段代碼是以空格為間隔符讀取統計txt文檔中出現的單詞數量,其中要注意的是行與行之間的分隔符也是“ ”,所以它只用一個flatmap就可以搞定,將所有單詞用“ ”分割,取出,統計數量。而第二段代碼是以“,”為分隔符統計每一行第41個單詞的數量,這里就不能用flatmap了,因為flatmap是將整個文件的單詞整合起來成為一個list,與map不同的是flatmap多加了一個flat(映射)的功能,所以我們就找不到第41個單詞了。這里用map,最后沒有映射,輸出的是一個大list里面套了很多小list,每一個小list代表一行,所以我們就可以操作這些小list去找到第41個單詞並統計。

來自博客:

http://blog.csdn.net/dwb1015/article/details/52013362


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM