5、創建RDD(集合、本地文件、HDFS文件)


一、創建RDD

1、創建RDD

進行Spark核心編程時,首先要做的第一件事,就是創建一個初始的RDD。該RDD中,通常就代表和包含了Spark應用程序的輸入源數據。然后在創建了初始的RDD之后,才可以通過Spark Core提供的transformation算子,對該RDD進行轉換,來獲取其他的RDD。


Spark Core提供了三種創建RDD的方式,包括:使用程序中的集合創建RDD;使用本地文件創建RDD;使用HDFS文件創建RDD。




1、使用程序中的集合創建RDD,主要用於進行測試,可以在實際部署到集群運行之前,自己使用集合構造測試數據,來測試后面的spark應用的流程。


2、使用本地文件創RDD,主要用於臨時性地處理一些存儲了大量數據的文件。


3、使用HDFS文件創建RDD,應該是最常用的生產環境處理方式,主要可以針對HDFS上存儲的大數據,進行離線批處理操作。

 

2、並行化集合創建RDD

如果要通過並行化集合來創建RDD,需要針對程序中的集合,調用SparkContext的parallelize()方法。Spark會將集合中的數據拷貝到集群上去,形成一個分布式的數據集合,也就是一個RDD。相當於是,集合中的部分數據會到一個節點上,而另一部分數據會到其他節點上。然后就可以用並行的方式來操作這個分布式數據集合,即RDD。

調用parallelize()時,有一個重要的參數可以指定,就是要將集合切分成多少個partition。Spark會為每一個partition運行一個task來進行處理。Spark官方的建議是,為集群中
的每個CPU創建2~4個partition。Spark默認會根據集群的情況來設置partition的數量。但是也可以在調用parallelize()方法時,傳入第二個參數,來設置RDD的partition數量。
比如parallelize(arr, 10)



// 案例:1到10累加求和

###java實現
package cn.spark.study.core;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

/**
 * 並行化集合創建RDD
 * 
 * @author bcqf
 *
 */

public class ParallelizeCollection {
    public static void main(String[] args) {
        // 創建SparkConf
        SparkConf conf = new SparkConf().setAppName("ParallelizeCollection").setMaster("local");

        // 創建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 要通過並行化集合的方式創建RDD,那么就調用SparkContext以及其子類,的parallelize()方法
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        
        // 執行reduce算子操作
                // 相當於,先進行1 + 2 = 3;然后再用3 + 3 = 6;然后再用6 + 4 = 10...以此類推;
        int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer num1, Integer num2) throws Exception {
                // TODO Auto-generated method stub
                return num1 + num2;
            }
            
        });
        //輸入累加和
        System.out.println("1-10的累加和:" + sum);
        
        // 關閉JavaSparkContext
        sc.close();
    }
}




###scala實現
package cn.spark.study.core

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object ParallelizeCollection {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("ParallelizeCollection").setMaster("local");

    val sc = new SparkContext(conf)

    val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    val numberRDD = sc.parallelize(numbers, 5)
    
    val sum = numberRDD.reduce(_ + _)
    
    println("1-10的累加和:" + sum)
  }

}

 

3、使用本地文件和HDFS創建RDD

Spark是支持使用任何Hadoop支持的存儲系統上的文件創建RDD的,比如說HDFS、Cassandra、HBase以及本地文件。通過調用SparkContext的textFile()方法,可以針對本地文件或HDFS文件創建RDD。

有幾個事項是需要注意的:
1、如果是針對本地文件的話,如果是在windows上本地測試,windows上有一份文件即可;如果是在spark集群上針對linux本地文件,那么需要將文件拷貝到所有worker節點上。
2、Spark的textFile()方法支持針對目錄、壓縮文件以及通配符進行RDD創建。
3、Spark默認會為hdfs文件的每一個block創建一個partition,但是也可以通過textFile()的第二個參數手動設置分區數量,只能比block數量多,不能比block數量少。


-------------------案例:文件字數統計   ;本地文件-------------------

##java實現
package cn.spark.study.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

/**
 * 使用本地文件創建RDD
 * 案例:統計文本文件字數
 * @author bcqf
 *
 */

public class LocalFile {
    public static void main(String[] args) {
        // 創建SparkConf
        SparkConf conf = new SparkConf().setAppName("LocalFile").setMaster("local");
        
        // 創建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 調用SparkContext以及其子類的textFile()方法,針對本地文件創建RDD
        JavaRDD<String> lines = sc.textFile("D://spark.txt");
        
        // 統計文本文件內的字數; Function<String, Integer> :String是接收類型,Integer是返回類型
        JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(String v1) throws Exception {
                return v1.length();
            }
            
        });
        
//Function2<T1, T2, R> ; call(T1 v1, T2 v2)
int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println("文字總數是:" + count); // 關閉JavaSparkContext sc.close(); } } -------------------案例:文件字數統計 ;HDFS文件------------------- ##java實現 package cn.spark.study.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; /** * 使用HDFS文件創建RDD * 案例:統計文本文件字數 * @author bcqf * */ public class HDFSFile { public static void main(String[] args) { // 創建SparkConf SparkConf conf = new SparkConf().setAppName("HDFSFile"); // 創建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 調用SparkContext以及其子類的textFile()方法,針對HDFS文件創建RDD JavaRDD<String> lines = sc.textFile("hdfs://spark1:9000/spark.txt"); // 統計文本文件內的字數 JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(String v1) throws Exception { return v1.length(); } }); int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println("文字總數是:" + count); // 關閉JavaSparkContext sc.close(); } } [root@spark1 java]# cat hdfs_file.sh #運行腳本 /usr/local/spark/bin/spark-submit \ --class cn.spark.study.core.HDFSFile \ --num-executors 3 \ --driver-memory 100m \ --executor-memory 100m \ --executor-cores 3 \ /usr/local/spark-study/java/saprk-study-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar \ ##打maven包-->上傳-->運行 -------------------案例:文件字數統計 ;本地文件------------------- ##scala實現 package cn.spark.study.core import org.apache.spark.SparkConf import org.apache.spark.SparkContext object LocalFile { def main(args: Array[String]) { val conf = new SparkConf().setAppName("LocalFile").setMaster("local") val sc = new SparkContext(conf) val lines = sc.textFile("D://spark.txt", 1) val count = lines.map {line => line.length() }.reduce(_ + _) println("file's count is: " + count) } } -------------------案例:文件字數統計 ;HDFS文件------------------- ##scala實現 package cn.spark.study.core import org.apache.spark.SparkConf import org.apache.spark.SparkContext object HDFSFile { def main(args: Array[String]) { val conf = new SparkConf().setAppName("HDFSFile") val sc = new SparkContext(conf) val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1) val count = lines.map {line => line.length() }.reduce(_ + _) println("file's count is: " + count) } } [root@spark1 scala]# cat hdfs_file.sh #運行腳本 /usr/local/spark/bin/spark-submit \ --class cn.spark.study.core.HDFSFile \ --num-executors 3 \ --driver-memory 100m \ --executor-memory 100m \ --executor-cores 3 \ /usr/local/spark-study/scala/spark-study-scala.jar \ ##打jar包(spark-study-scala.jar)-->上傳-->運行


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM