1.spark的wordcount解析


一、Eclipse(scala IDE)開發local和cluster

(一). 配置開發環境

  1. 要在本地安裝好java和scala。 
    由於spark1.6需要scala 2.10.X版本的。推薦 2.10.4,java版本最好是1.8。所以提前我們要需要安裝好java和scala並在環境變量中配置好。
  2. 下載scala IDE for eclipse安裝 連接:http://scala-ide.org/download/sdk.html 
    打開ide新建scala project 
    點擊file -> new ->Scala Project ,在彈出的對話框中彈性project name 為“WordCount”,默認點擊next,點擊finish的。
  3. 修改Scala版本 
    項目創建完成后默認使用的是scala的2.11.7 版本。要手動將版本換成2.10.X。在項目名稱右擊選擇properties,在彈出窗口點擊,scala Compiler,在右側窗口,選中Use Project settings, 將scala Installation 修改為Latest 2.10 bundle(dynamic).點擊apply,點擊ok。scala版本變成2.10.6。

  4. 找到依賴的spark jar文件並導入到eclipse中。 
    所依賴的jar文件是 
    spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar。 
    在項目名稱上右擊,選擇build path ->configure build path。在彈出框中點擊library,點擊右側的addExternalJARs,然后選擇 
    park-assembly-1.6.0-hadoop2.6.0.jar點擊打開,然后點擊ok。

(二)、spark程序開發步驟

1. 在src下建立spark程序工程包

在src上右擊new ->package 填入package的name為com.dt.spark。

2. 創建scala的入口類。

在包的名字上右擊選擇new ->scala class 。在彈出框中Name 中,在增加WordCount。點擊finish。 
在方法內部講關鍵字class 改成object ,然后創建main方法。

3. local模式代碼方法

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. def main(args: Array[String]): Unit ={
    4. * 集群的master的URL,如果設置為local則在本地運行。
    5. val conf = new SparkConf()
    6. conf.setMaster("local")
    7. /**第2步,創建SparkContext對象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark應用程序的
    8. * */
    9. * 數據被RDD划分為一系列的Partitions,分配到每個partition的數據屬於一個Task的處理范疇
    10. val lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md", 1) //讀取本地文件並設置一個partition
    11. /**第4步,對初始的RDD進行Transformation級別的處理,如map、filter高階函數編程,進行具體計算
    12. val words = lines.flatMap{ line => line.split(" ")}//對每行字符串進行單詞拆分,並把所有拆分結果通過flat合並成一個大的單詞集合
    13. (word, 1)} //在單詞拆分基礎上對每個單詞實例計數為1
    14. wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))
    15. }
    16. 在運行過程中會出現WARN NativeCodeLoader: Unable to load native-Hadoop library for your platform... using builtin-Java classes where applicable。Java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. 這個錯誤。但是在local模式下,這個是正常的。因為spark是和hadoop編譯在一起的,我們在window 下開發,缺少hadoop的配置。這不是程序錯誤,也不影響我們的任何功能。

       

      4.編寫Cluster模式代碼

      1. import org.apache.spark.SparkContext
      2. def main(args: Array[String]){
      3. * 集群的master的URL,如果設置為local則在本地運行。
      4. val conf = new SparkConf() //創建SparkConf對象
      5. // conf.setMaster("spark://master:7077")
      6. /**第2步,創建SparkContext對象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark應用程序的
      7. * */
      8. /**第3步,根據數據源(HDFS,HBase,Local FS)通過SparkContext來創建RDD
      9. * */
      10. val lines = sc.textFile("/library/wordcount/input/Data") //讀取HDFS文件並切分成不同的Partions
      11. /**第4步,對初始的RDD進行Transformation級別的處理,如map、filter高階函數編程,進行具體計算
      12. val words = lines.flatMap { line =>line.split(" ")} //對每一行的字符串進行單詞拆分並把所有行的拆分結果通過flat合並成為一個大的單詞集合
      13. val pairs = words.map { word => (word, 1) }
      14. wordCounts.collect.foreach(wordNumberPair =>println(wordNumberPair._1 + " : " + wordNumberPair._2))
      15. }
      16. 將程序達成jar 包 
        在項目名稱上右擊點擊export選擇java 下的jar file,點擊next,選擇輸出目錄,輸入文件名,點擊next,點擊next,然后點擊完成。導出jar 包。

         

        將jar 放到Linux系統某個目錄中。執行 
        ./spark-submit --class com.dt.spark.WordCount_Cluster --master spark://worker1:7077 ./wordcount.jar

         

        也可以將以上命令保存到.sh文件中,直接執行sh文件即可。

        二、使用idea開發spark的Local和Cluster

        (一)、配置開發環境

        1. 要在本地安裝好java和scala。

        由於spark1.6需要scala 2.10.X版本的。推薦 2.10.4,java版本最好是1.8。所以提前我們要需要安裝好java和scala並在環境變量中配置好

        2. 下載IDEA 社區版本,選擇windows 版本並按照配置。

        安裝完成以后啟動IDEA,並進行配置,默認即可,然后點擊ok以后,設置ui風格,然后點擊next 會出現插件的選擇頁面,默認不需求修改,點擊next,選擇安裝scala語言,點擊install 按鈕(非常重要,以為要開發spark程序所以必須安裝),等安裝完成以后點擊start啟動IDEA。

        3. 創建scala項目

        點擊 create new project ,然后填寫project name為“Wordcount”,選擇項目的保存地址project location。 
        然后設置project sdk即java 的安裝目錄。點擊右側的new 按鈕,選擇jdk,然后選擇java 的安裝路徑即可。 
        然后選擇scalasdk。點擊右側的create ,默認出現時2.10.x 版本的scala,點擊ok即可。然后點擊finish。

        4. 設置spark的jar 依賴。

        點擊file->project structure 來設置工程的libraries。核心是添加spark的jar依賴。選擇Libraries ,點擊右側的加號,選擇java,選擇spark1.6.0 的spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar。點擊ok。稍等片刻后然后點擊ok(Libraries作用於WordCount),然后點擊apply,點擊ok。(這一步很重要,如果沒有無法編寫spark的代碼)

        (二)、編寫代碼

        1. 在src下建立spark程序工程包

        在src上右擊new ->package 填入package的name為com.dt.spark。

        2. 創建scala的入口類。

        在包的名字上右擊選擇new ->scala class 。在彈出框中填寫Name ,並制定kind為object ,點擊ok。

        3. 編寫local代碼

        1. import org.apache.spark.SparkConf
        2. import org.apache.spark.rdd.RDD
        3. def main(args: Array[String]): Unit ={
        4. * 集群的master的URL,如果設置為local則在本地運行。
        5. val conf = new SparkConf()
        6. conf.setMaster("local")
        7. /**第2步,創建SparkContext對象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark應用程序的
        8. * */
        9. * 數據被RDD划分為一系列的Partitions,分配到每個partition的數據屬於一個Task的處理范疇
        10. val lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md", 1) //讀取本地文件並設置一個partition
        11. /**第4步,對初始的RDD進行Transformation級別的處理,如map、filter高階函數編程,進行具體計算
        12. val words = lines.flatMap{ line => line.split(" ")}//對每行字符串進行單詞拆分,並把所有拆分結果通過flat合並成一個大的單詞集合
        13. (word, 1)} //在單詞拆分基礎上對每個單詞實例計數為1
        14. wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))
        15. }
        16. 在代碼去右擊選擇點擊run”wordCount”來運行程序。在生成環境下肯定是寫自動化shell 腳本自動提交程序的。 
          注意:如果val sc = new SparkContext(conf)報錯,並且沒有運行結果,需要將scala的module改成scala 2.10版本的。具體操作:File->project structure -> Dependencies ->刪除scala 2.11.x的module.-> 左上角的“+” -> scala ->選中scala2.10.4 -> apply

           

          4. 編寫Cluster模式代碼

          1. import org.apache.spark.SparkConf
          2. import org.apache.spark.rdd.RDD
          3. def main(args: Array[String]): Unit ={
          4. * 集群的master的URL,如果設置為local則在本地運行。
          5. val conf = new SparkConf()
          6. //conf.setMaster("spark://master:7077")
          7. * 核心組件,包括DAGScheduler,TaskScheduler,SchedulerBackend
          8. val sc = new SparkContext(conf)
          9. /**第3步,根據數據源(HDFS,HBase,Local FS)通過SparkContext來創建RDD
          10. * */
          11. /**第4步,對初始的RDD進行Transformation級別的處理,如map、filter高階函數編程,進行具體計算
          12. val words = lines.flatMap{ line => line.split(" ")}//對每行字符串進行單詞拆分,並把所有拆分結果通過flat合並成一個大的單詞集合
          13. (word, 1)} //在單詞拆分基礎上對每個單詞實例計數為1
          14. pairs._2, pairs._1)).sortByKey(false).map(pair=>(pair._1, pair._2))//相同的key,value累加並且排名
          15. println(wordNumberPair._1 + ":" + wordNumberPair._2))
          16. }
          17. 將程序達成jar 包 
            點擊file->project structure,在彈出的頁面點擊Artifacts,點擊右側的“+”,選擇jar –> from modules with dependencies,在彈出的頁面中,設置好main class 然后點擊ok,在彈出頁面修改Name(系統生成的name不規范)、導出位置並刪除scala和spark的jar(因為集群環境中已經存在)點擊ok 。然后在菜單欄中點擊build –> Artifacts ,在彈出按鈕中,點擊bulid,會自動開始打包。

             

            在spark中執行wordcount方法。 
            將jar 放到linux系統某個目錄中。執行

            1. 注意事項: 
              為什么不能再ide開發環境中,直接發布spark程序到spark集群中? 
              1. 開發機器的內存和cores的限制,默認情況情況下,spark程序的dirver在提交spark程序的機器上,如果在idea中提交程序的話,那idea機器就必須非常強大。 
              2. Dirver要指揮workers的運行並頻繁的發生同學,如果開發環境和spark集群不在同樣一個網絡下,就會出現任務丟失,運行緩慢等多種不必要的問題。 
              3. 這是不安全的。

               

              三、WordCount的java開發版本

               
            2. 安裝jdk並配置環境變量 
              系統變量→新建 JAVA_HOME 變量。 
              變量值填寫jdk的安裝目錄(本人是 E:\Java\jdk1.7.0) 
              系統變量→尋找 Path 變量→編輯 
              在變量值最后輸入 %JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;(注意原來Path的變量值末尾有沒有;號,如果沒有,先輸入;號再輸入上面的代碼) 
              系統變量→新建 CLASSPATH 變量值填寫 .;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar(注意最前面有一點)
            3. Maven的安裝和配置 
              解壓apache-maven-3.1.1-bin.zip,並把解壓后的文件夾下的apache-maven-3.1.1文件夾移動到D:\Java下,如果沒有Java這個文件夾的話,請自行創建 
              新建系統變量 MAVEN_HOME 變量值:D:\Java\apache-maven-3.1.1。編輯系統變量 Path 添加變量值: ;%MAVEN_HOME%\bin。 
              在mave 的目錄中修改conf/settings.xml,在localRepository屬性后添加D:/repository修改maven下載jar 的位置。
            4. eclipse 中java 和maven 的配置 
              點擊 window ->java ->Installed JREs ->add ->standard vm ,點擊next ,然后選擇jdk 的安裝路徑點擊finish即可。 
              點擊window ->Maven ->Installations ->add 在彈出頁面選擇mave 的安裝路徑,然后點擊finish。然后在列表中選擇我們自己剛添加的那個maven信息。 
              然后點擊window ->Maven ->User Setings 在右側的User Settings 點擊browse 現在mavenconf目錄下的setttings.xml .(主要是修改maven下載依賴包存放的位置)
            (二). 創建maven項目
            1. 創建maven項目 
              點擊file ¬->new ->others ->maven project 點擊next,選擇maven-archetype-quickstart ,點擊next,group id 為 com.dt.spark,artifact id 為 sparkApps,然后點擊finish。
            2. 修改jdk和pom文件 
              創建maven項目后,默認的jdk是1.5要改成我們前面安裝好的jdk1.8。在項目上右擊build path ->configure build path 。在彈出頁面點擊Libraries,選中jre system library 。點擊edit,在彈出框選擇workspace default jre ,然后點擊finish。然后在點擊ok。將pom文件修改為如下內容,然后等待eclipse下載好maven依賴的jar包,並編譯工程。編譯好工程后有個錯誤提示,在此錯誤列上,右擊選擇quick fix ,在彈出頁面點擊finish即可。
            1. xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            2. 4.0.0</modelVersion>
            3. <groupId>com.dt.spark</groupId>
            4. SparkApps</artifactId>
            5. 0.0.1-SNAPSHOT</version>
            6. jar</packaging>
            7. <name>SparkApps</name>
            8. http://maven.apache.org</url>
            9. <properties>
            10. UTF-8</project.build.sourceEncoding>
            11. <dependencies>
            12. junit</groupId>
            13. junit</artifactId>
            14. 3.8.1</version>
            15. test</scope>
            16. org.apache.spark</groupId>
            17. spark-core_2.10</artifactId>
            18. 1.6.0</version>
            19. org.apache.spark</groupId>
            20. spark-sql_2.10</artifactId>
            21. 1.6.0</version>
            22. org.apache.spark</groupId>
            23. spark-hive_2.10</artifactId>
            24. 1.6.0</version>
            25. org.apache.spark</groupId>
            26. spark-streaming_2.10</artifactId>
            27. 1.6.0</version>
            28. org.apache.hadoop</groupId>
            29. hadoop-client</artifactId>
            30. 2.6.0</version>
            31. org.apache.spark</groupId>
            32. spark-streaming-kafka_2.10</artifactId>
            33. 1.6.0</version>
            34. org.apache.spark</groupId>
            35. spark-graphx_2.10</artifactId>
            36. 1.6.0</version>
            37. <build>
            38. src/main/java</sourceDirectory>
            39. src/main/test</testSourceDirectory>
            40. <plugins>
            41. maven-assembly-plugin</artifactId>
            42. jar-with-dependencies</descriptorRef>
            43. make-assembly</id>
            44. package</phase>
            45. single</goal>
            46. org.codehaus.mojo</groupId>
            47. exec-maven-plugin</artifactId>
            48. 1.3.1</version>
            49. exec</goal>
            50. java</executable>
            51. false</includeProjectDependencies>
            52. compile</classpathScope>
            53. com.dt.spark.SparkApps.WordCount</mainClass>
            54. org.apache.maven.plugins</groupId>
            55. maven-compiler-plugin</artifactId>
            56. 1.6</source>
            57. 1.6</target>
            58. </project>
            1. 創建包路徑以及java代碼 
              在包路徑com.dt.spark.SparkApps上右擊 new ->package 在彈出頁面name中填寫com.dt.spark.SparkApps.cores,點擊finish的。 
              在包路徑下com.dt.spark.SparkApps.cores上右擊 new ->class ,在彈出窗口中name中填寫WordCount ,點擊finish。然后在WordCount 中編寫如下代碼。
            (三). local版本
            1. import java.util.Arrays;
            2. import scala.Function;
            3. public static void main(String[] args){
            4. //其底層就是scala的SparkContext
            5. String> lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md");
            6. String> words = lines.flatMap(new FlatMapFunction<String, String>(){
            7. public Iterable<String> call(String line)throws Exception{
            8. });
            9. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){
            10. public Tuple2<String, Integer> call(String word)throws Exception{
            11. String, Integer>(word, 1);
            12. });
            13. JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){ //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce)
            14. public Integer call(Integer v1, Integer v2)throws Exception{
            15. });
            16. wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>(){
            17. public void call(Tuple2<String, Integer>pair)throws Exception{
            18. });
            19. }

          在代碼區右擊run as -> java application 。來運行此程序並查看運行結果。

          (四). cluster版本的代碼
          1. import java.util.Arrays;
          2. import scala.Function;
          3. public static void main(String[] args){
          4. String> lines = sc.textFile("/library/wordcount/input/Data");
          5. String> words = lines.flatMap(new FlatMapFunction<String, String>(){
          6. public Iterable<String> call(String line)throws Exception{
          7. });
          8. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){
          9. public Tuple2<String, Integer> call(String word)throws Exception{
          10. String, Integer>(word, 1);
          11. });
          12. JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){
          13. public Integer call(Integer v1, Integer v2)throws Exception{
          14. });
          15. wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>(){
          16. public void call(Tuple2<String, Integer>pair)throws Exception{
          17. });
          18. }

        四、徹底解析wordcount運行原理

        1. 從數據流動視角解密WordCount

        即用Spark作單詞計數統計,數據到底是怎么流動的,參看一圖: 
        從數據流動的視角分析數據到底是怎么被處理

        1. word,1)).reduceByKey(_+_).saveAsTextFile(outputPathwordcount)

        簡單實驗

        (1)在IntelliJ IDEA中編寫下面代碼:

        1. import org.apache.spark.SparkConf
        2. object WordCount {
        3. valconf = new SparkConf()
        4. conf.setMaster("local")
        5. val lines = sc.textFile("D://tmp//helloSpark.txt", 1)
        6. line.split(" ") }
        7. (word,1) }
        8. wordCounts.foreach(wordNumberPair =>println(wordNumberPair._1 + " : " + wordNumberPair._2))
        9. }
        10. 2)在D盤下地tmp文件夾下新建helloSpark.txt文件,內容如下:
        11. Hello Hadoop
        12. Spark is awesome
        13. Flink : 1
        14. is : 1
        15. awesome : 1
        16. Scala : 1

        Spark有三大特點:

        1. 分布式。無論數據還是計算都是分布式的。默認分片策略:Block多大,分片就多大。但這種說法不完全准確,因為分片切分時有的記錄可能跨兩個Block,所以一個分片不會嚴格地等於Block的大小,例如HDFS的Block大小是128MB的話,分片可能多幾個字節或少幾個字節。一般情況下,分片都不會完全與Block大小相等。 
          分片不一定小於Block大小,因為如果最后一條記錄跨兩個Block的話,分片會把最后一條記錄放在前一個分片中。
        2. 基於內存(部分基於磁盤)
        3. 迭代

        查看在SparkContext.scala中的testFile源碼

        1. path: String,
        2. assertNotStopped()
        3. minPartitions).map(pair => pair._2.toString)
        4. 可以看出在進行了hadoopFile之后又進行了map操作。 
          HadoopRDD從HDFS上讀取分布式文件,並且以數據分片的方式存在於集群之中。

           

          RDD.scala中的map源碼

          1. * Return a new RDD by applying a function to all elements of this RDD.
          2. def map[U: ClassTag](f: T => U): RDD[U] = withScope {
          3. new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
          4. 讀取到的一行數據(key,value的方式),對行的索引位置不感興趣,只對其value事情興趣。pair時有個匿名函數,是個tuple,取第二個元素。 
            此處又產生了MapPartitionsRDD。MapPartitionsRDD基於hadoopRDD產生的Parition去掉行的KEY。 
            注:可以看出一個操作可能產生一個RDD也可能產生多個RDD。如sc.textFile就產生了兩個RDD:hadoopRDD和MapParititionsRDD。
            下一步:

             

            1. line.split(" ") }

            對每個Partition中的每行進行單詞切分,並合並成一個大的單詞實例的集合。 
            FlatMap做的一件事就是對RDD中的每個Partition中的每一行的內容進行單詞切分。 
            這邊有4個Partition,對單詞切分就變成了一個一個單詞,

            下面是FlatMap的源碼(RDD.scala中)

            1. * Return a new RDD by first applying a function to all elements of this
            2. */
            3. TraversableOnce[U]): RDD[U] = withScope {
            4. new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
            5. 可以看出flatMap又產生了一個MapPartitionsRDD,此時的各個Partition都是拆分后的單詞。 
              下一步:

               

              1. (word,1) }

              將每個單詞實例變為形如word=>(word,1) 
              map操作就是把切分后的每個單詞計數為1。 
              根據源碼可知,map操作又會產生一個MapPartitonsRDD。此時的MapPartitionsRDD是把每個單詞變成Array(""Hello",1),("Spark",1)等這樣的形式。 
              下一步:

              1. reduceByKey是進行全局單詞計數統計,對相同的key的value相加,包括local和reducer同時進行reduce。所以在map之后,本地又進行了一次統計,即local級別的reduce。 
                shuffle前的Local Reduce操作,主要負責本地局部統計,並且把統計后的結果按照分區策略放到不同的File。 
                下一Stage就叫Reducer了,下一階段假設有3個並行度的話,每個Partition進行Local Reduce后都會把數據分成三種類型。最簡單的方式就是用HashCode對其取模。 
                至此都是stage1。 
                Stage內部完全基於內存迭代,不需要每次操作都有讀寫磁盤,所以速度非常快。

                 

                reduceByKey的源碼(PairRDDFunctions.scala中):

                1. V): RDD[(K, V)] = self.withScope {
                2. v, func, func, partitioner)
                3. * Merge the values for each key using an associative and commutative reduce function. This will
                4. * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
                5. def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
                6. }
                7. /**
                8. * also perform the merging locally on each mapper before sending results to a reducer, similarly
                9. * parallelism level.
                10. def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
                11. }

                可以看到reduceByKey內部有combineByKeyWithClassTag。combineByKeyWithClassTag的源碼如下:

                1. createCombiner: V => C,
                2. C,
                3. C,
                4. mapSideCombine: Boolean = true,
                5. require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
                6. if (mapSideCombine) {
                7. }
                8. throw new SparkException("Default partitioner cannot partition array keys.")
                9. }
                10. self.context.clean(createCombiner),
                11. self.context.clean(mergeCombiners))
                12. self.mapPartitions(iter => {
                13. new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
                14. } else {
                15. .setSerializer(serializer)
                16. .setMapSideCombine(mapSideCombine)
                17. }

                可以看出在combineByKeyWithClassTag內又new 了一個ShuffledRDD。 
                ReduceByKey有兩個作用: 
                1. 進行Local級別的Reduce,減少網絡傳輸。 
                2. 把當前階段的內容放到本地磁盤上供shuffle使用。

                下一步是shuffledRDD,

                產生Shuffle數據就需要進行分類,MapPartitionsRDD時其實已經分好類了,最簡單的分類策略就是Hash分類。 
                ShuffledRDD需要從每台機上抓取同一單詞。 
                reduceByKey發生在哪里? 
                Stage2全部都是reduceByKey

                最后一步:保存數據到HDFS(MapPartitionsRDD)

                統計完的結果:(“Hello”,4)只是一個Value,而不是Key:"Hello",value:4。但輸出到文件系統時需要KV的格式,現在只有Value,所以需要造個KEY。

                saveAsTextFile的源碼:

                1. this.map(x => (NullWritable.get())),new Text(x.toStirng))
                2. }

                this.map把當前的值(x)變成tuple。tuple的Key是Null,Value是(“Hello”,4)。 
                為什么要為樣?因為saveAsHadoopFile時要求以這樣的格式輸出。Hadoop需要KV的格式!! 
                map操作時把key舍去了,輸出時就需要通過生成Key。 
                第一個Stage有哪些RDD?HadoopRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD 
                第二個Stage有哪些RDD?ShuffledRDD、MapPartitionsRDD

                只有Collect 或saveAsTextFile會觸發作業,其他的時候都沒有觸發作業(Lazy)

                2. 從RDD依賴關系的視角解密WordCount。Spark中的一切操作皆RDD,后面的RDD對前面的RDD有依賴關系。

                3. DAG與Lineage的思考。依賴關系會形成DAG。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM