Spark算子:RDD基本轉換操作(1)–map、flatMap、distinct


Spark算子:RDD基本轉換操作(1)–map、flatMap、distinct

 

關鍵字:Spark算子、Spark RDD基本轉換、map、flatMap、distinct

  • map

將一個RDD中的每個數據項,通過map中的函數映射變為一個新的元素。

輸入分區與輸出分區一對一,即:有多少個輸入分區,就有多少個輸出分區。

  1. hadoop fs -cat /tmp/lxw1234/1.txt
  2. hello world
  3. hello spark
  4. hello hive
  5.  
  6.  
  7. //讀取HDFS文件到RDD
  8. scala> var data = sc.textFile("/tmp/lxw1234/1.txt")
  9. data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21
  10.  
  11. //使用map算子
  12. scala> var mapresult = data.map(line => line.split("\\s+"))
  13. mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23
  14.  
  15. //運算map算子結果
  16. scala> mapresult.collect
  17. res0: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive))
  18.  
  19.  
  • flatMap

屬於Transformation算子,第一步和map一樣,最后將所有的輸出分區合並成一個。

  1. /使用flatMap算子
  2. scala> var flatmapresult = data.flatMap(line => line.split("\\s+"))
  3. flatmapresult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at :23
  4.  
  5. //運算flagMap算子結果
  6. scala> flatmapresult.collect
  7. res1: Array[String] = Array(hello, world, hello, spark, hello, hive)
  8.  

使用flatMap時候需要注意:
flatMap會將字符串看成是一個字符數組。
看下面的例子:

  1. scala> data.map(_.toUpperCase).collect
  2. res32: Array[String] = Array(HELLO WORLD, HELLO SPARK, HELLO HIVE, HI SPARK)
  3. scala> data.flatMap(_.toUpperCase).collect
  4. res33: Array[Char] = Array(H, E, L, L, O, , W, O, R, L, D, H, E, L, L, O, , S, P, A, R, K, H, E, L, L, O, , H, I, V, E, H, I, , S, P, A, R, K)
  5.  

再看:

  1. scala> data.map(x => x.split("\\s+")).collect
  2. res34: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive), Array(hi, spark))
  3.  
  4. scala> data.flatMap(x => x.split("\\s+")).collect
  5. res35: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark)
  6.  

這次的結果好像是預期的,最終結果里面並沒有把字符串當成字符數組。
這是因為這次map函數中返回的類型為Array[String],並不是String。
flatMap只會將String扁平化成字符數組,並不會把Array[String]也扁平化成字符數組。

參考:
http://alvinalexander.com/scala/collection-scala-flatmap-examples-map-flatten

  • distinct

對RDD中的元素進行去重操作。

    1. scala> data.flatMap(line => line.split("\\s+")).collect
    2. res61: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark)
    3.  
    4. scala> data.flatMap(line => line.split("\\s+")).distinct.collect
    5. res62: Array[String] = Array(hive, hello, world, spark, hi)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM