Spark之RDD(含Java運行環境配置)


一:RDD簡介

(一)RDD概念

RDD(Resilient Distributed DataSet),彈性分布式數據集,是Spark中最基本,也是最重要的數據抽象,代表一個不可變、可分區、里面的元素可並行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知度調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在內存中,后續的查詢能重用工作集,這極大地提升了查詢速度。因為有RDD,所以Spark才支持分布式的計算。RDD由分區組成。

(二)RDD的五個特性

(1)一組分片(Partition),即數據集的基本組成單位。---RDD會被分片處理,用於並行計算

對於RDD來說,每個分片都會被一個計算任務處理,並決定並行計算的粒度
用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值。
默認值就是程序所分配到的CPU Core的數目。

(2)一個計算每個分區的函數。---一個對每個split(數據分區)進行計算的函數,也稱為RDD的算子

Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。
compute函數會對迭代器進行復合,不需要保存每次計算的結果。

(3)RDD之間的依賴關系。(DAG有向無環圖調度構造依賴關系)

RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前后依賴關系。
在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。

(4)一個Partitioner(分區器),即RDD的分片函數。---用來對RDD的數據做手動分區

當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另外一個是基於范圍的RangePartitioner。
只有對於於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。
Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。

(5)一個列表,存儲存取每個分片(Partition)的優先位置(preferred location)。

對於一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。
按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置。

(三)RDD原理

(四)wordcount程序RDD執行流程

二:RDD創建方式

(一)通過讀取文件生成

由外部存儲系統的數據集創建,包括本地的文件系統,還有所有Hadoop支持的數據集,比如HDFS、Cassandra、HBase等

val file = sc.textFile("/spark/input/c.txt")

(二)通過並行化的方式創建RDD

val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array)

(三)其他方式

讀取數據庫等等其他的操作。也可以生成RDD。

RDD可以通過其他的RDD轉換而來的。

三:RDD編程API

Spark支持兩個類型(算子)操作:TransformationAction

(一)Transformation---不會觸發計算,延時加載(計算)

主要做的是就是將一個已有的RDD生成另外一個RDD。Transformation具有lazy特性(延遲加載)。Transformation算子的代碼不會真正被執行。只有當我們的程序里面遇到一個action算子的時候,代碼才會真正的被執行。這種設計讓Spark更加有效率地運行。

轉換

含義

map(func)

返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成

filter(func)

返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成

flatMap(func)

類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)

mapPartitions(func)

類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是

(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子

union(otherDataset)

對源RDD和參數RDD求並集后返回一個新的RDD

intersection(otherDataset)

對源RDD和參數RDD求交集后返回一個新的RDD

distinct([numTasks]))

對源RDD進行去重后返回一個新的RDD

groupByKey([numTasks])

在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

先按分區聚合 再總的聚合   每次要跟初始值交流 例如:aggregateByKey(0)(_+_,_+_) 對k/y的RDD進行操作

sortByKey([ascending], [numTasks])

在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD

sortBy(func,[ascending], [numTasks])

與sortByKey類似,但是更靈活 第一個參數是根據什么排序  第二個是怎么排序 false倒序   第三個排序后分區數  默認與原RDD一樣

join(otherDataset, [numTasks])

在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD  相當於內連接(求交集)

cogroup(otherDataset, [numTasks])

在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD

cartesian(otherDataset)

兩個RDD的笛卡爾積  的成很多個K/V

pipe(command, [envVars])

調用外部程序

coalesce(numPartitions)   

重新分區 第一個參數是要分多少區,第二個參數是否shuffle 默認false  少分區變多分區 true   多分區變少分區 false

repartition(numPartitions)

重新分區 必須shuffle  參數是要分多少區  少變多

repartitionAndSortWithinPartitions(partitioner)

重新分區+排序  比先分區再排序效率高  對K/V的RDD進行操作

foldByKey(zeroValue)(seqOp)

該函數用於K/V做折疊,合並處理 ,與aggregate類似   第一個括號的參數應用於每個V值  第二括號函數是聚合例如:_+_

combineByKey

合並相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)

partitionBy(partitioner)

對RDD進行分區  partitioner是分區器 例如new HashPartition(2

cache

RDD緩存,可以避免重復計算從而減少時間,區別:cache內部調用了persist算子,cache默認就一個緩存級別MEMORY-ONLY ,而persist則可以選擇緩存級別

persist

Subtract(rdd)

返回前rdd元素不在后rdd的rdd

leftOuterJoin

leftOuterJoin類似於SQL中的左外關聯left outer join,返回結果以前面的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。

rightOuterJoin

rightOuterJoin類似於SQL中的有外關聯right outer join,返回結果以參數中的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可

subtractByKey

substractByKey和基本轉換操作中的subtract類似只不過這里是針對K的,返回在主RDD中出現,並且不在otherRDD中出現的元素

(二)Action:直接觸發計算

 觸發代碼的運行,我們一段spark代碼里面至少需要有一個action操作

動作

含義

reduce(func)

通過func函數聚集RDD中的所有元素,這個功能必須是課交換且可並聯的

collect()

在驅動程序中,以數組的形式返回數據集的所有元素

count()

返回RDD的元素個數

first()

返回RDD的第一個元素(類似於take(1))

take(n)

返回一個由數據集的前n個元素組成的數組

takeSample(withReplacement,num, [seed])

返回一個數組,該數組由從數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子

takeOrdered(n[ordering])

 

saveAsTextFile(path)

將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本

saveAsSequenceFile(path

將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。

saveAsObjectFile(path

 

countByKey()

針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。

foreach(func)

在數據集的每一個元素上,運行函數func進行更新。

aggregate

先對分區進行操作,在總體操作

reduceByKeyLocally

 

lookup

 

top

 

fold

 

foreachPartition

 

(三)RDD算子示例

1.創建一個Int類型的RDD---parallelize

val rdd1 = sc.parallelize(Array(5,1,75,32,647,23,5))

2.對每個元素乘以2(map),並且排序(sortBy)---Transformation延遲計算

val rdd2 = rdd1.map(_*2).sortBy(x=>x,true)  第一個參數是匿名函數,第二個true表示升序,false降序

注意:x=>x是匿名函數,傳參是x,省略了類型,因為從數據中可以知道是Int  ---  (x:Int)=>{x}  ---  x=>x   返回值是x  根據返回值的大小進行排序

3.以數組的形式返回數據集的所有元素(collect)---Action立即執行

rdd2.collect

4.過濾掉大於100的數(filter)---Transformation延遲計算

val rdd3 = rdd2.filter(_<100)

5.flatMap嵌套展開,並執行內部函數---Transformation延遲計算

val rdd5 = rdd4.flatMap(_.split(" "))

6. 集合運算---Transformation延遲計算

並集:

val rdd8 = rdd6.union(rdd7)

交集:

val rdd9 = rdd6.intersection(rdd7)

7.分組操作groupByKey--- Transformation延遲計算

8.更多transformation和action算子示例,見:https://www.cnblogs.com/qingyunzong/p/8922135.html

四:RDD緩存機制---Transformation延遲計算

RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,並供后面重用。

(一)案例實驗

在Spark shell中讀入文件,並進行文件行的計數,然后我們對數據進行緩存后再執行計算,注意,這里執行了count之后才會觸發計算,將數據緩存下來,當再次執行count的時候,就會用到緩存的結果

第二次執行效率有所提高 

五:RDD寬依賴和窄依賴

(一)寬依賴和窄依賴概念

每組依賴中左邊為父RDD,右邊為子RDD

窄依賴:是指每個父RDD的一個Partition最多被子RDD的一個Partition所使用,例如map、filter、union等操作都會產生窄依賴;(獨生子女)

寬依賴:是指一個父RDD的Partition會被多個子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都會產生寬依賴;(超生)

需要特別說明的是對join操作有兩種情況: 

(1)圖中左半部分join:如果兩個RDD在進行join操作時,一個RDD的partition僅僅和另一個RDD中已知個數的Partition進行join,那么這種類型的join操作就是窄依賴,例如圖1中左半部分的join操作(join with inputs co-partitioned);---多個子RDD,對於每個子RDD都滿足窄依賴

(2)圖中右半部分join:其它情況的join操作就是寬依賴,例如圖1中右半部分的join操作(join with inputs not co-partitioned),由於是需要父RDD的所有partition進行join的轉換,這就涉及到了shuffle,因此這種類型的join操作也是寬依賴。

總之:沒有依賴關系的stage是可以並行執行的,DAG根據寬依賴來划分stage,每個寬依賴的處理均會是一個stage的划分點。同一個stage中的多個操作會在一個task中完成。因為子RDD的分區僅依賴於父RDD的一個分區,因此這些步驟可以串行執行。

(二)依賴關系下的數據流視圖

在spark中,會根據RDD之間的依賴關系將DAG圖(有向無環圖)划分為不同的階段,

對於窄依賴,由於partition依賴關系的確定性,partition的轉換處理就可以在同一個線程里完成,窄依賴就被spark划分到同一個stage中,

而對於寬依賴,只能等父RDD shuffle處理完成后,下一個stage才能開始接下來的計算。

因此spark划分stage的整體思路是

從左往右推,遇到寬依賴就斷開,划分為一個stage;

遇到窄依賴就將這個RDD加入該stage中。

因此在圖2中RDD C,RDD D,RDD E,RDDF被構建在一個stage中,RDD A被構建在一個單獨的Stage中,而RDD B和RDD G又被構建在同一個stage中。

在spark中,Task的類型分為2種:ShuffleMapTaskResultTask

簡單來說,DAG的最后一個階段會為每個結果的partition生成一個ResultTask,即每個Stage里面的Task的數量是由該Stage中最后一個RDD的Partition的數量所決定的而其余所有階段都會生成ShuffleMapTask之所以稱之為ShuffleMapTask是因為它需要將自己的計算結果通過shuffle到下一個stage中;也就是說上圖中的stage1和stage2相當於mapreduce中的Mapper,而ResultTask所代表的stage3就相當於mapreduce中的reducer。

在之前動手操作了一個wordcount程序,因此可知,Hadoop中MapReduce操作中的Mapper和Reducer在spark中的基本等量算子是map和reduceByKey;不過區別在於:Hadoop中的MapReduce天生就是排序的;而reduceByKey只是根據Key進行reduce,但spark除了這兩個算子還有其他的算子;因此從這個意義上來說,Spark比Hadoop的計算算子更為豐富。

六:RDD的Checkpoint(檢查點)機制:容錯機制

(一)檢查點是輔助RDD的lineage(血統)進行容錯的管理。

任務后面的步驟依賴於前面的步驟,如果一旦出現錯誤,則無法往后繼續計算,就必須從頭開始,這樣必然會使得效率降低。整個計算的周期就叫lineage,lineage越長,出錯的概率越大。所以我們可以設置一個檢查點,如果后面的計算過程出錯,則不需要從頭重新進行計算,則可以從檢查點的地方再次開始計算。

(二)RDD的檢查點有兩種類型

本地目錄(即檢查點的信息保存在本地的文件夾,一般用於開發和測試),HDFS目錄(保存在HDFS,多用於生產環境)

本地目錄:需要把spark-shell運行在本地模式上

sc.setCheckpointDir("/root/temp/sparkcheckpoint")

HDFS目錄:需要把spark-shell運行在集群模式上

sc.setCheckpointDir("hdfs://ns1/sparkcheckpoint")

七:Spark程序編寫---WordCount

(一)配置eclipse環境

https://blog.csdn.net/xummgg/article/details/50651867

(二)使用scala語言編寫---本地模式

package com.dt.spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD


object WordCount {
  def main(args:Array[String]):Unit={
    val conf = new SparkConf()
    //設置運行模式為本地運行,不然默認是集群模式
    conf.setMaster("local")
    //設置任務名
    conf.setAppName("WordCount")
    //設置SparkContext,是SparkCore的程序入口
    val sc = new SparkContext(conf)
    //讀取文件,生成RDD
    val file:RDD[String] = sc.textFile("E:\\storm-kafka\\a.txt")
    //將每一行數據進行嵌套展開、分割
    val words:RDD[String] = file.flatMap(_.split(","))
    //進行單詞映射
    val wordOne:RDD[(String,Int)] = words.map((_,1))
    //進行單詞計數
    val wordCount:RDD[(String,Int)] = wordOne.reduceByKey(_+_)
    //按照單詞出現次數,降序排序
    val wordSort:RDD[(String,Int)] = wordCount.sortBy(x=>x._2, false)
    //將結果保存
    //wordSort.foreach(wordNumberPair => println(wordNumberPair._1 + " : " +wordNumberPair._2))//在命令行中打印該結果

    wordSort.saveAsTextFile("E:\\Output")
    
    sc.stop()
  }
}

(二)使用scala語言編寫---集群模式

package com.dt.spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD


object WordCount {
  def main(args:Array[String]):Unit={
    val conf = new SparkConf()
    //設置運行模式為本地運行,不然默認是集群模式
    //conf.setMaster("local")  //默認是集群模式  master,是一個Spark、Mesos或者Yarn集群的URL,或者是local[*]; //設置任務名
    conf.setAppName("WordCount")  //appName,是用來在Spark UI上顯示的應用名稱; //設置SparkContext,是SparkCore的程序入口
    val sc = new SparkContext(conf)
    //讀取文件,生成RDD
    val file:RDD[String] = sc.textFile("/spark/input/c.txt")  //hdfs文件系統
    //將每一行數據進行嵌套展開、分割
    val words:RDD[String] = file.flatMap(_.split(" "))
    //進行單詞映射
    val wordOne:RDD[(String,Int)] = words.map((_,1))
    //進行單詞計數
    val wordCount:RDD[(String,Int)] = wordOne.reduceByKey(_+_)
    //按照單詞出現次數,降序排序
    val wordSort:RDD[(String,Int)] = wordCount.sortBy(x=>x._2, false)
    //將結果顯示
    wordSort.collect().foreach(wordNumberPair => println(wordNumberPair._1 + " : " +wordNumberPair._2))//在命令行中打印該結果

    //wordSort.saveAsTextFile("E:\\Output")
    
    sc.stop()
  }
}

導出jar包,進行job提交。需要指定url

[hadoop@hadoopH1 ~]$ spark-submit --class com.dt.spark.WordCount --master spark://hadoopH2:7077 --executor-memory 500m --total-executor-cores 1 ./wc.jar 

(三)使用Java7編寫

package cn.spark.wc;

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class WordCount {
    public static void main(String[] args) {
        //初始化sparkcontext
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("WordCount");
        
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        //讀取文件,生成RDD
        JavaRDD<String> fileRDD = sc.textFile("E:\\storm-kafka\\a.txt");
        
        //將每一行數據按照","分割
        JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                // TODO Auto-generated method stub
                return Arrays.asList(line.split(",")).iterator();    //返回列表迭代器
            }
        });
        
        //進行數據映射
        JavaPairRDD<String,Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {    //第一個是傳入參數:單詞,后面兩個參數:是返回的元組參數
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<>(word,1);
            }
        });
        
        //進行reduce操作
        JavaPairRDD<String,Integer> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {    //前兩個是傳入參數,后一個類型是返回參數
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                // TODO Auto-generated method stub
                return i1 + i2;
            }
        });
                
        //下面進行排序操作sortByKey,由於是按照key進行排序,所以我們需要先將原來的元組<string,Integer>,變為<Integer,string>。---使用maptopair
        JavaPairRDD<Integer,String> countWordRDD = wordCountRDD.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<>(t._2,t._1);
            }
        });
        
        //開始使用sortByKey進行排序
        JavaPairRDD<Integer,String> sortCWRDD = countWordRDD.sortByKey(false);    //默認升序,false降序
        
        //將我們處理的數據從<Integer,String>轉換為<String,Integer>
        JavaPairRDD<String,Integer> resultRDD = sortCWRDD.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<>(t._2,t._1);
            }
        });
            
        //結果輸出
        resultRDD.saveAsTextFile("E:\\storm-kafka\\res");
    }
}

補充:運行常見錯誤(一)無法運行,出現(launch error)

配置項目運行時參數:

-Dspark.master=local

補充:運行常見錯誤(二)無法運行,出現(unsupported major.minor version 52.0)運行環境錯誤

 項目編譯得到的class文件的版本高於運行環境中jre的版本號,高版本JDK編譯的class不能在低版本的jvm虛擬機下運行,否則就會報這類錯,因此無法運行!49,50,51,52是Java編譯器內部的版本號,版本對應信息如下:

  Unsupported major.minor version 52.0 對應於 JDK1.8(JRE1.8) 

  Unsupported major.minor version 51.0 對應於 JDK1.7(JRE1.7) 

  Unsupported major.minor version 50.0 對應於 JDK1.6(JRE1.6) 

  Unsupported major.minor version 49.0 對應於 JDK1.5(JRE1.5

因此出現問題的原因就是:編譯產生的class文件是jdk1.8版本的,而jvm環境低於1.8,因此報錯:

修改Java運行環境:

運行結果查看:

(四)使用Java8編寫---lambda表達式

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class SparkWordCountWithJava8 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("WortCount");
        conf.setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> fileRDD = sc.textFile("E:\\storm-kafka\\a.txt");
        JavaRDD<String> wordRdd = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());
        JavaPairRDD<String, Integer> wordOneRDD = wordRdd.mapToPair(word -> new Tuple2<>(word, 1));
        JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey((x, y) -> x + y);
        JavaPairRDD<Integer, String> count2WordRDD = wordCountRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));
        JavaPairRDD<Integer, String> sortRDD = count2WordRDD.sortByKey(false);
        JavaPairRDD<String, Integer> resultRDD = sortRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));
        resultRDD.saveAsTextFile("E:\\storm-kafka\\res");

    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM