RDD 介紹
spark 最重要的一個概念叫 RDD,Resilient Distributed Dataset,彈性分布式數據集,它是 spark 的最基本的數據(也是計算)抽象。
代碼中是一個抽象類,它代表一個 不可變、可分區、里面的元素可並行計算的數據集合。
RDD 的屬性
擁有一組分區:數據集的基本組成單位
擁有一個計算每個分區的函數
擁有一個分區器,partitioner,即 RDD 的分片函數
RDD 間存在依賴關系 【下面 RDD 特點中有解釋】
擁有一個列表,存儲每個 partitioner 的優先位置
優先位置
這里有個概念叫移動數據不如移動計算,之前在 hadoop 中接觸過這個概念,不多解釋;
RDD 也是分區存儲的,那么 RDD 的數據到底存儲在哪個節點,必須有個記錄;
當有請求過來時,為了提高運行效率,我們需要把任務下發到存儲所需數據的節點,這樣避免了網絡傳輸
RDD 的特點
只讀:無法進行更改,也就是不可變,如果需要改動,只能創建一個新的 RDD,也就是從一個 RDD 轉換成另一個 RDD
依賴關系:一個 RDD 由另一個 RDD 轉換得到,也就是 一個 RDD 依賴另一個 RDD,而且可以多層依賴,為了便於理解和描述,我們有時把依賴關系也叫血緣關系;
當 血緣關系 很長時,可以通過持久化 RDD 來切斷血緣關系;
惰性執行:RDD 的執行是按照血緣關系延時計算的,也就是 惰性計算
依賴關系
依賴關系有兩種
窄依賴:一對一;RDDs 之間分區 一 一對應,也就是一個分區完全由另一個分區轉換得到,而不是由另外幾個分區轉換得到
寬依賴:多對一;與窄依賴相反,一個分區由另外幾個分區轉換得到
緩存
緩存是把 RDD 暫時存起來,用於如下場景:
1. 優化效率:一個程序中多次用到一個 RDD,可以先把這個 RDD 緩存起來,避免重復計算
2. 避免出錯:
distFile = sc.textFile('README.md') m = distFile.map(lambda x: len(x)) # map 是 轉換 操作,並不立即執行 m.cache() # 把 map 的輸出緩存到內存中,其實 cache 就是 執行 操作 #或者 m.persist()
RDD 的創建
創建 RDD 有 3 種方式
1. 基於內存:從內存中的數據創建 RDD,並行化已存在的序列
2. 基於外部存儲介質:讀取外部數據源,比如 hdfs 【外部存儲介質中的數據,注意 spark 本身沒有存儲功能】
3. 由一個 RDD 轉換得到:這種方式不多解釋
並行化數據集合
並行化集合是通過在一個 迭代器或者集合 上 調用 SparkContext 的 parallelize 方法生成的 【內存中的數據】
可能還有個 makeRDD,我這里實驗不成功,makeRDD 封裝了 parallelize,功能是一樣的
data = range(10000) distData = sc.parallelize(data) distData.reduce(lambda a, b: a+b)
為了創建一個能並行操作的分布式數據集,所有元素都將被拷貝;
然后我們可以調用 reduce 進行疊加,累計求和
並行化集合時一個重要的參數是將數據集切分的數量。一個切片對應一個 spark 任務,我們可指定切片數量
distData2 = sc.parallelize(data, 100) # 切 100 分
外部數據集
由於 spark 本身沒有存儲功能,一般是從 本地文件、hadoop 等獲取外部數據集
本地文件
textFile(name, minPartitions=None, use_unicode=True)[source] Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings. If use_unicode is False, the strings will be kept as str (encoding as utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)
minPartitions 指定最小分區數,也就是說,實際分區數可能比這個數大,在 hdfs 中,文件本身就是 以 block 存儲的,此時這個 分區數 不能小於 block 數
示例代碼
distFile = sc.textFile('README.md') distFile = sc.textFile('xx.csv') distFile = sc.textFile('logs') # 支持文件夾 # textFile("/my/directory/*.txt") # 支持通配符 # textFile("/my/directory/*.gz") # 支持壓縮文件 type(sc.textFile('README.md')) # <class 'pyspark.rdd.RDD'> distFile.map(lambda x: int(x[-1])).reduce(lambda a, b: a + b) distFile.map(lambda x: len(x)).reduce(lambda a, b: a + b) # map 的輸入是每一行,map 的作用是求每一行的 len # reduce 的輸入是兩個數字,reduce 的作用是求這兩個數的和, # 也就是把 所有行的 len 逐次求當前元素(當前累計 len 的和)與下一元素的和
讀取文件后自動生成 RDD;
各種讀取方式都支持 多種 文件格式,如 文件夾,通配符、壓縮文件
批量讀取本地文件
distFile = sc.wholeTextFiles('files') # 讀取 files 文件夾下所有文件內容,返回 (filename, filecontent) 鍵值對
輸入必須是 路徑
讀取 hdfs 文件
rdd = sc.textFile('hdfs://hadoop10:9000/spark/dwd.csv')
RDD 的讀取與存儲
讀取和存儲其實都是一種 action 操作,這里只做簡單介紹
rdd.collect() # 讀取 rdd.saveAsPickleFile('path') rdd.saveAsTextFile('path')
存儲路徑事先不能存在;Pickle 序列化,編碼存儲;
RDD 算子
算子:認知心理學中的概念,它認為解決問題其實是將問題的初始狀態,通過一系列的操作,對問題狀態進行轉換,然后達到解決狀態,這個一系列操作稱為算子。
只是一種高大上的叫法,其實就是 RDD 操作
RDD 支持兩種類型的操作:transformations 和 actions
transformations:轉換,就是從一種 RDD 轉換成 另一種 符合 要求 的 RDD,類似於 map
actions:行動,執行計算,類似於 reduce,返回結果
值得注意的是,transformations 是惰性計算,也就是說 transformations 並不立即執行,只有 actions 時,才會執行 transformations
這使得 spark 高效,以 map-reduce 為例,如果我們 map 並不是目的,reduce 才是目的,那么 map 之后 就計算的話,輸出存在哪里呢?
如果存在文件,一浪費時間,二浪費地方,如果我們 reduce 時才執行 map,直接把 map 的龐大輸出存入內存即可,甚至 流式 傳給 reduce,非常高效。
RDD 分區
我們通過實操來驗證 RDD 是如何分區的
parallelize 分區
操作一:local 模式,不指定分區
data = range(5) distData = sc.parallelize(data) distData.saveAsPickleFile('output') # 存儲路徑事先不能存在
輸出 4 個分區
_SUCCESS part-00000 # 存放了 0 part-00001 # 存放了 1 part-00002 # 存放了 2 part-00003 # 存放了 3,4
4 從何而來呢?
[root@hadoop10 ~]# cat /proc/cpuinfo |grep "cores"|uniq cpu cores : 4
即 CPU 核數,
也就是說,parallelize 默認分區數為 CPU 核數
操作二:local 模式,指定分區
data = range(5) distData = sc.parallelize(data, 3) distData.saveAsTextFile('output')
輸出 3 個分區
_SUCCESS part-00000 # 存放了 0 part-00001 # 存放了 12 part-00002 # 存放了 34
也就是說,指定了分區數,就分為多少個區,注意,請指定 大於等於 2 的數,不要瞎搞
操作三:local 模式,指定分區,且分區數大於數據量
data = range(5) distData = sc.parallelize(data, 6) distData.saveAsTextFile('output')
輸出 6 個分區
_SUCCESS part-00000 # 存放了 空 part-00001 # 存放了 0 part-00002 # 存放了 1 part-00000 # 存放了 2 part-00001 # 存放了 3 part-00002 # 存放了 4
也就是說,即使數據量不夠多個分區瓜分,也會按指定個數生成分區,沒分到數據的分區為空
通過源碼知悉,parallelize 的分區數優先級 為 自定義 --> max(cpu 核數,2),也就是最小為 2
textFile 分區
這個相對來說比較麻煩,我們做個簡單介紹。
首先看下 textFile 源碼
可以看到,minPartitions 就是我們自定義的分區數;然后 調用了 hadoopFile 方法並把 minPartitions 傳給了 hadoopFile;
minPartitions 的默認值是 調用 defaultMinPartitions 方法,源碼如下
它取 defaultParallelism 和 2 的最小值;defaultParallelism 方法 源碼如下
spark.default.parallelism 是在 SPARK_HOME/conf/spark-default.conf 中設置的一個參數,默認是沒有這個配置的;
所以:
1. 如果沒有在 textFile 中指定 minPartitions,而且也沒有配置 spark.default.parallelism,那么 minPartitions 最大是 2
2. 如果沒有在 textFile 中指定 minPartitions,但是配置了 spark.default.parallelism,那么 minPartitions 最大也是 2 【如果沒有自定義分區數 minPartitions,那么 minPartitions = min(max(CPU 核數, 2), 2);假設我們認為核數大於 2 ,故 minPartitions = 2】
3. 如果沒有在 textFile 中指定 minPartitions,minPartitions 最大是 2
4. 如果在 textFile 中指定了 minPartitions,minPartitions 就是指定數目
接下來,我們把 minPartitions 傳給 hadoopFile,hadoopFile 會計算文件(如果有多個文件,計算全部文件大小)大小 totalSize,然后把文件 split 成 minPartitions 分,每個切片大小為 goalSize = totalSize / minPartitions,取整
每個 hadoop block 也是一個 文件,在 hadoop中 block 大小是有 最大值的,比如 64M,也就是說我們不能把文件切分成 大於 block 大小的塊,也就是說 切片 取 min(goalSize, blockSize) 作為 真正的 切片大小,
但是 切片太小的話又沒有意義了,所以 splitSize = max(minSize,min(goalSize, blockSize)), 而 minSize = max(SPLIT_MINSIZE,1),也就是說 minSize 最小為 1, 於是 splitSize 最小為 1,
分數區計算:totalSize / splitSize,如果整除,就是本身,如果不整除,取整,然后 + 1
那我們來測試下
dataFile = sc.textFile('data.txt') dataFile.saveAsTextFile('output')
7 字節文件,不指定分區數,結果分為 3 個區;splitSize = 7 / 2 = 3.5 = 3, 分區數 = 7 / 3 = 2 + 1 = 3
8 字節文件,不指定分區數,結果分為 2 個區;splitSize = 8 / 2 = 4,分區數 = 8 / 4 = 2
8 字節文件,指定 4 個分區,splitSize = 8 / 4 = 2,分區數 = 8 / 2 = 4;經測試,正解
參考資料:
https://blog.csdn.net/zjwcsdn123/article/details/80489537 spark中textfile方法對文件的分片
https://www.jianshu.com/p/e33671341f0d spark通過textFile讀取hdfs數據分區數量規則