Spark菜鳥學習營Day1 從Java到RDD編程


Spark菜鳥學習營Day1

從Java到RDD編程

菜鳥訓練營主要的目標是幫助大家從零開始,初步掌握Spark程序的開發。
Spark的編程模型是一步一步發展過來的,今天主要帶大家走一下這段路,讓我們從一段最最基礎的Java代碼開始。

問題:Java有哪些數據結構

大致有如下幾種,其中List與Map是最重要的:

  • List
  • Map
  • Set
  • Array
  • Heap
  • Stack
  • Queue
  • Tree

練習:構造一個1-5的List,把他們打印出來

寫法1

        List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
        for (int i = 0; i < input.size(); i++) {
            System.out.println(input.get(i));
        }

這個寫法存在的問題:

  1. 存在額外定義的變量i,需要對i進行邏輯控制(比如不能超過size)。
  2. 無法在循環內部如果對List進行安全更新,會產生邏輯問題。
  3. 這個寫法非常的不推薦使用!!!

寫法2

        Iterator<Integer> it = input.iterator();
        while (it.hasNext()) {
            System.out.println("a" + it.next());
        }

幾點說明:

  1. iterator相當於oracle中游標,擁有數據只讀和只能一次性讀的特點。
  2. hasNext()和next)()是游標必備的方法,其中hasNext是判斷是否有下一條,等同%Found,next是獲取下一條數,等同fetch。
    這個寫法存在的問題:
  3. 存在額外的對象it,讀取方法比較簡單,但是也需要記憶。

寫法3

        for (Integer i : input) {
            System.out.println(i);
        }

這是對寫法2的優化,邏輯完全等價,是一個語法糖。

寫法2和寫法3共同存在的問題:

  1. 執行是串行的,對大數量的情況不適合。

寫法4

input.parallelStream().forEach(
                new Consumer<Integer>() {
                    @Override
                    public void accept(Integer a) {
                        System.out.println(a);
                    }
                }
        );

在這個寫法中,執行是並行執行的,
從語句特點看是將一個對象出入了forEach方法中,進行一下說明:

  1. 多線程程序,其實是幾個互相獨立運行的程序,產生了一個問題,我們需要將邏輯傳遞給給各自獨立運行的程序
  2. 由於在Java中對象是全局存儲的,所以是唯一可以在獨立運行程序中傳遞的介質。
  3. 但是我們要的傳遞的是邏輯,比如要轉變為對象才能傳遞,如果變成對象,要做兩件事情:
    • 把邏輯,用方法來封裝(示例中的apply方法)
    • 把方法,用class或者intereface來封裝,更推薦intereface(示例中的Consumer)
      • 因為class既可以包含數據、又可以包含方法,而intereface中只包含方法,在我們的場景中,只需要傳遞方法
  4. 最后,接口提供方預先將intereface和method定義好,調用者按照模板使用即可。
    存在的問題:
  5. 語句比較長,邏輯也比較復雜

寫法5

        input.parallelStream().forEach(
                a -> System.out.println(a)
        );

這個寫法和寫法4邏輯沒有任何區別,主要是用lambda表達式簡化了對象的寫法,是一個語法糖。
寫法4和寫法5共同的問題是:
當數據量進一步擴大,一台機器就算是多線程運行也無法完成的情況下,是無法處理的。

小結

在上述的幾個寫法中,主要是通過對於List的一系列增強,從而解決了一系列的問題。

  1. 寫法2、3相比寫法1,使用Iterator增強List,實現了數據的可更新。
  2. 寫法4、5相比前面的寫法,使用Stream增強Iterator,實現了數據的並發運算。
    但是,還是留下了在更大數據集情況下的處理。這個是時候,我們需要引入Spark。
    Spark中核心是RDD,是對Stream的進一步增強,在並發的基礎上,增加了同時在多台機器上的分布式計算。解決了大數據集的問題。

練習:RDD編程

題目

進行RDD操作的訓練:

  1. 讀取交易記錄
  2. 按照SecurityId進行計數
  3. 根據計數,從高到低排序
  4. 輸出結果

練習過程:

Step1:獲取RDD數據,這個是調用公共方法,請注意的是,PracticePojo是預先准備好的測試數據名稱。

JavaRDD<PracticePojo> inputTradeRecords = this.getInputRDD(PracticePojo.class);

Step2:將數據轉為Key-Value格式

因為在Spark處理中,reduce、sort、group等操作涉及到在分布式機器間的數據交互,數據必須要有Key來作為分布操作的依據,所以我們首先要將數據格式進行轉換。

        JavaPairRDD<String, Integer> mappedTradeRecords = inputTradeRecords.mapToPair(
                new PairFunction<PracticePojo, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(PracticePojo practicePojo) throws Exception {
                        return new Tuple2<String, Integer>(practicePojo.getSecurityId(), 1);
                    }
                });

Step3:執行計數操作。

這里會用到reduceByKey方法,這里要注意的是,reduce是一個非常常用的操作,它有兩個操作步驟:

  1. 對數據按照Key進行分組
  2. 對每組數據執行算子計算,需要注意reduce的計算必須滿足交換律和結合律
    • 這個算子是指,當整個計算過程都滿足交換律和結合律后,我們可以用任意兩個數據之間的計算關系來定義整個計算關系
    • 比如:sum和count,可以用加法表示,max可以用a>b?a:b這樣的計算來表示,min是反過來
        JavaPairRDD<String, Integer> reducedTradeRecords = mappedTradeRecords.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }
                });

Step4:執行排序

這里有兩步操作:

  1. 首先是使用mapToPair方法進行數據變形,因為sortByKey方法僅是針對key來排序,而我們原始數據的key並不是我們要的排序字段,所以首先需要將key和value換一下
  2. 用sortByKey執行排序操作,需要注意的是,參數是一個布爾值,默認為升序,false表示降序
     JavaPairRDD<Integer, String> reversedTradeRecords = reducedTradeRecords.mapToPair(
                new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                    @Override
                    public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                        return new Tuple2<Integer, String>(stringIntegerTuple2._2, stringIntegerTuple2._1);
                    }
                });

        JavaPairRDD<Integer, String> sortedTradeRecords = reversedTradeRecords.sortByKey(false);

Step5:輸出結果

一般我們都是輸出一個JavaRDD,這里采用了預先定義的PracticeResultPojo結構,會將排序完的結果映射到這個結構上。

JavaRDD<PracticeResultPojo> resultTradeRecords = sortedTradeRecords.map(
                new Function<Tuple2<Integer, String>, PracticeResultPojo>() {
                    @Override
                    public PracticeResultPojo call(Tuple2<Integer, String> v1) throws Exception {
                        PracticeResultPojo resultPojo = new PracticeResultPojo();
                        resultPojo.setSecurityId(v1._2);
                        resultPojo.setCount(v1._1);
                        return resultPojo;
                    }
                });

小結

RDD編程,難點如下:

  1. 采用對象方式來封裝邏輯,和stream中處理方式一致,但是由於算子較多,每個算子要求不同,需要有一定練習來熟悉
  2. 輸入輸出為JavaRDD,但reduce、group、sort等操作需要中間使用mapToPair將JavaRDD轉成JavaPairRDD才能操作,會涉及到多次轉換
  3. reduce操作中,算子是更高層次的抽象,有一定的理解難度


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM