spark教程(三)-RDD認知與創建


RDD 介紹

spark 最重要的一個概念叫 RDD,Resilient Distributed Dataset,彈性分布式數據集,它是 spark 的最基本的數據(也是計算)抽象。

代碼中是一個抽象類,它代表一個 不可變、可分區、里面的元素可並行計算的數據集合。

 

RDD 的屬性

擁有一組分區:數據集的基本組成單位

擁有一個計算每個分區的函數

擁有一個分區器,partitioner,即 RDD 的分片函數

RDD 間存在依賴關系    【下面 RDD 特點中有解釋】

擁有一個列表,存儲每個 partitioner 的優先位置

 

優先位置

這里有個概念叫移動數據不如移動計算,之前在 hadoop 中接觸過這個概念,不多解釋;

RDD 也是分區存儲的,那么 RDD 的數據到底存儲在哪個節點,必須有個記錄;

當有請求過來時,為了提高運行效率,我們需要把任務下發到存儲所需數據的節點,這樣避免了網絡傳輸

 

RDD 的特點

只讀:無法進行更改,也就是不可變,如果需要改動,只能創建一個新的 RDD,也就是從一個 RDD 轉換成另一個 RDD

依賴關系:一個 RDD 由另一個 RDD 轉換得到,也就是 一個 RDD 依賴另一個 RDD,而且可以多層依賴,為了便於理解和描述,我們有時把依賴關系也叫血緣關系

  當 血緣關系 很長時,可以通過持久化 RDD 來切斷血緣關系

惰性執行:RDD 的執行是按照血緣關系延時計算的,也就是 惰性計算

 

依賴關系

依賴關系有兩種

窄依賴:一對一;RDDs 之間分區 一 一對應,也就是一個分區完全由另一個分區轉換得到,而不是由另外幾個分區轉換得到

寬依賴:多對一;與窄依賴相反,一個分區由另外幾個分區轉換得到

 

緩存

緩存是把 RDD 暫時存起來,用於如下場景:

1. 優化效率:一個程序中多次用到一個 RDD,可以先把這個 RDD 緩存起來,避免重復計算

2. 避免出錯

 

distFile = sc.textFile('README.md')
m = distFile.map(lambda x: len(x))      # map 是 轉換 操作,並不立即執行
m.cache()       # 把 map 的輸出緩存到內存中,其實 cache 就是 執行 操作

#或者 
m.persist() 

 

RDD 的創建

創建 RDD 有 3 種方式

1. 基於內存:從內存中的數據創建 RDD,並行化已存在的序列

2. 基於外部存儲介質:讀取外部數據源,比如 hdfs    【外部存儲介質中的數據,注意 spark 本身沒有存儲功能】

3. 由一個 RDD 轉換得到:這種方式不多解釋

 

並行化數據集合

並行化集合是通過在一個 迭代器或者集合 上 調用 SparkContext 的 parallelize 方法生成的    【內存中的數據】

可能還有個 makeRDD,我這里實驗不成功,makeRDD 封裝了 parallelize,功能是一樣的

data = range(10000)
distData = sc.parallelize(data)
distData.reduce(lambda a, b: a+b)

為了創建一個能並行操作的分布式數據集,所有元素都將被拷貝;

然后我們可以調用 reduce 進行疊加,累計求和

 

並行化集合時一個重要的參數是將數據集切分的數量。一個切片對應一個 spark 任務,我們可指定切片數量

distData2 = sc.parallelize(data, 100)   # 切 100 分

 

外部數據集

由於 spark 本身沒有存儲功能,一般是從 本地文件、hadoop 等獲取外部數據集

 

本地文件

textFile(name, minPartitions=None, use_unicode=True)[source]
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

If use_unicode is False, the strings will be kept as str (encoding as utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)

minPartitions 指定最小分區數,也就是說,實際分區數可能比這個數大,在 hdfs 中,文件本身就是 以 block 存儲的,此時這個 分區數 不能小於 block 數

 

示例代碼

distFile = sc.textFile('README.md')
distFile = sc.textFile('xx.csv')
distFile = sc.textFile('logs')      # 支持文件夾
# textFile("/my/directory/*.txt")   # 支持通配符
# textFile("/my/directory/*.gz")    # 支持壓縮文件

type(sc.textFile('README.md'))      # <class 'pyspark.rdd.RDD'>


distFile.map(lambda x: int(x[-1])).reduce(lambda a, b: a + b)
distFile.map(lambda x: len(x)).reduce(lambda a, b: a + b)   # map 的輸入是每一行,map 的作用是求每一行的 len
                                                            # reduce 的輸入是兩個數字,reduce 的作用是求這兩個數的和,
                                                            # 也就是把 所有行的 len 逐次求當前元素(當前累計 len 的和)與下一元素的和

讀取文件后自動生成 RDD;

各種讀取方式都支持 多種 文件格式,如 文件夾,通配符、壓縮文件

 

批量讀取本地文件

distFile = sc.wholeTextFiles('files')   # 讀取 files 文件夾下所有文件內容,返回 (filename, filecontent) 鍵值對

輸入必須是 路徑

 

讀取 hdfs 文件

rdd = sc.textFile('hdfs://hadoop10:9000/spark/dwd.csv')

 

RDD 的讀取與存儲

讀取和存儲其實都是一種 action 操作,這里只做簡單介紹

rdd.collect()    # 讀取
rdd.saveAsPickleFile('path')    
rdd.saveAsTextFile('path') 

存儲路徑事先不能存在;Pickle 序列化,編碼存儲;

 

RDD 算子

算子:認知心理學中的概念,它認為解決問題其實是將問題的初始狀態,通過一系列的操作,對問題狀態進行轉換,然后達到解決狀態,這個一系列操作稱為算子。

只是一種高大上的叫法,其實就是 RDD 操作

 

RDD 支持兩種類型的操作:transformations 和 actions

transformations:轉換,就是從一種 RDD 轉換成 另一種 符合 要求 的 RDD,類似於 map

actions:行動,執行計算,類似於 reduce,返回結果

 

值得注意的是,transformations 是惰性計算,也就是說 transformations 並不立即執行,只有 actions 時,才會執行 transformations

這使得 spark 高效,以 map-reduce 為例,如果我們 map 並不是目的,reduce 才是目的,那么 map 之后 就計算的話,輸出存在哪里呢?

如果存在文件,一浪費時間,二浪費地方,如果我們 reduce 時才執行 map,直接把 map 的龐大輸出存入內存即可,甚至 流式 傳給 reduce,非常高效。

 

RDD 分區

我們通過實操來驗證 RDD 是如何分區的

 

parallelize 分區

操作一:local 模式,不指定分區

data = range(5)
distData = sc.parallelize(data)
distData.saveAsPickleFile('output')   # 存儲路徑事先不能存在

輸出 4 個分區

_SUCCESS
part-00000  # 存放了 0    
part-00001  # 存放了 1        
part-00002  # 存放了 2    
part-00003  # 存放了 3,4    

 

4 從何而來呢?

[root@hadoop10 ~]# cat /proc/cpuinfo |grep "cores"|uniq
cpu cores    : 4

即 CPU 核數,

也就是說,parallelize 默認分區數為 CPU 核數

 

操作二:local 模式,指定分區

data = range(5)
distData = sc.parallelize(data, 3)
distData.saveAsTextFile('output')

輸出 3 個分區

_SUCCESS
part-00000  # 存放了 0    
part-00001  # 存放了 12        
part-00002  # 存放了 34  

也就是說,指定了分區數,就分為多少個區,注意,請指定 大於等於 2 的數,不要瞎搞

 

操作三:local 模式,指定分區,且分區數大於數據量

data = range(5)
distData = sc.parallelize(data, 6)
distData.saveAsTextFile('output')

輸出 6 個分區

_SUCCESS
part-00000  # 存放了 空    
part-00001  # 存放了 0        
part-00002  # 存放了 1
part-00000  # 存放了 2   
part-00001  # 存放了 3        
part-00002  # 存放了 4

也就是說,即使數據量不夠多個分區瓜分,也會按指定個數生成分區,沒分到數據的分區為空

 

通過源碼知悉,parallelize 的分區數優先級 為 自定義 --> max(cpu 核數,2),也就是最小為 2

 

textFile 分區

這個相對來說比較麻煩,我們做個簡單介紹。

 

首先看下 textFile 源碼

 可以看到,minPartitions 就是我們自定義的分區數;然后 調用了 hadoopFile 方法並把 minPartitions 傳給了 hadoopFile;

minPartitions 的默認值是 調用 defaultMinPartitions 方法,源碼如下

它取 defaultParallelism 和 2 的最小值;defaultParallelism 方法 源碼如下

spark.default.parallelism 是在 SPARK_HOME/conf/spark-default.conf 中設置的一個參數,默認是沒有這個配置的;

 

所以

1. 如果沒有在 textFile 中指定 minPartitions,而且也沒有配置 spark.default.parallelism,那么 minPartitions 最大是 2

2. 如果沒有在 textFile 中指定 minPartitions,但是配置了 spark.default.parallelism,那么 minPartitions 最大也是 2  【如果沒有自定義分區數 minPartitions,那么 minPartitions = min(max(CPU 核數, 2), 2);假設我們認為核數大於  2 ,故 minPartitions = 2】

3.  如果沒有在 textFile 中指定 minPartitions,minPartitions 最大是 2

4. 如果在 textFile 中指定了 minPartitions,minPartitions 就是指定數目

 

 

接下來,我們把 minPartitions 傳給 hadoopFile,hadoopFile 會計算文件(如果有多個文件,計算全部文件大小)大小 totalSize,然后把文件 split 成  minPartitions 分,每個切片大小為 goalSize = totalSize / minPartitions,取整

每個 hadoop block 也是一個 文件,在 hadoop中 block 大小是有 最大值的,比如 64M,也就是說我們不能把文件切分成 大於 block 大小的塊,也就是說 切片 取 min(goalSize, blockSize) 作為 真正的 切片大小,

但是 切片太小的話又沒有意義了,所以 splitSize = max(minSize,min(goalSize, blockSize)), 而 minSize = max(SPLIT_MINSIZE,1),也就是說 minSize 最小為 1, 於是 splitSize 最小為 1,

分數區計算:totalSize / splitSize,如果整除,就是本身,如果不整除,取整,然后 + 1

 

那我們來測試下

dataFile = sc.textFile('data.txt')
dataFile.saveAsTextFile('output')

7 字節文件,不指定分區數,結果分為 3 個區;splitSize = 7 / 2 = 3.5 = 3, 分區數 = 7 / 3 = 2 + 1 = 3

8 字節文件,不指定分區數,結果分為  2 個區;splitSize = 8 / 2 =  4,分區數 = 8 / 4 = 2

 

8 字節文件,指定 4 個分區,splitSize = 8 / 4 = 2,分區數 = 8 / 2 = 4;經測試,正解

 

 

參考資料:

https://blog.csdn.net/zjwcsdn123/article/details/80489537  spark中textfile方法對文件的分片

https://www.jianshu.com/p/e33671341f0d  spark通過textFile讀取hdfs數據分區數量規則


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM