一、什么是RDD
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.
彈性分布式數據集(RDD),Spark中的基本抽象。表示可以並行操作的元素的不變分區集合。
- 彈性:可以存儲在磁盤或內存中(多種存儲級別)
- 分布:分布在集群中的只讀對象集合(由多個Partition構成)
(一)特性
1、分區
上述定義中也說了RDD是一個抽象的概念,數據是存儲在RDD下的Partition分區,這些分區可以分布在一個節點上,也可以分布在不同的節點上。
2、依賴
上述定義中RDD是只讀和不可變的,那么如果想要改變其中的值,通過不斷創建變量這種方式來實現。比如:
#定義一個變量 x = 2 #改變這個值,此時有多了一個變量y,同時有聊新的值 y = 2x + 1
這樣,可以不斷創建新的變量,形成血緣依賴關系。
3、緩存
默認是緩存到內存的,但是支持多種緩存策略,可以靈活的進行變更。
(二)核心屬性
調度和計算都依賴於這五個屬性:
- 分區列表
RDD是一個抽象的概念,它對應多個Partition,所以有一個分區列表的屬性
- 依賴列表
RDD中的變量是不可變的,它是有一個依賴關系,這與上面的依賴特性進行對應。
- Compute函數,用於計算RDD各分區的值
- 分區策略(可選)
數據是如何對應一個RDD中的多個Partition。
- 優先位置列表(可選,HDFS實現數據本地化,避免數據移動)
二、RDD的生成
RDD的生成有三種方式,分別是:
- 從外部文件創建
- 集合並行化
- 從父RDD生成子RDD
(一)從外部文件創建
- 支持本地磁盤文件
- 支持整個目錄、多文件、通配符
- 支持壓縮文件
- 支持HDFS
讀取文件使用的方法是textFile:
textFile(name, minPartitions=None, use_unicode=True) Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of
Strings.
讀取各種本地文件、目錄、HDFS的形式:

SC.textFi le(“/1.tXt, /02.tXt“) #支持多文件,中間以逗號分隔 SC.textFi le(”/*.txt“) #支持通配符
實例:
(二)集合並行化
集合並行化就是對一些數據結構,比如列表等生成RDD。
>>> sc = spark.sparkContext >>> sc <pyspark.context.SparkContext object at 0x0000000000ADB7B8> >>> x = [1,2,3] >>> rdd = sc.parallelize(x) >>> rdd.collect() [Stage 0:> (0 + 0) / 4] [1, 2, 3] >>>
對於parallelize方法:
parallelize(c, numSlices=None) Distribute a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.
其中c是傳入的data,比如list類型數據,numSlices是切片的數量,每一個切片可以啟動一個task任務。
(三)從父RDD生成子RDD
1、Transformation
Transformation |
Meaning |
map(func) |
Return a new distributed dataset formed by passing each element of the source through a function func. |
filter(func) |
Return a new dataset formed by selecting those elements of the source on which funcreturns true. |
flatMap(func) |
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
mapPartitions(func) |
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T |
intersection(otherDataset) |
Return a new RDD that contains the intersection of elements in the source dataset and the argument. |
distinct([numTasks])) |
Return a new dataset that contains the distinct elements of the source dataset. |
union(otherDataset) |
Return a new dataset that contains the union of the elements in the source dataset and the argument. |
使用Transformation中的函數可以對數據進行處理:
2、Action
Action |
Meaning |
reduce(func) |
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. |
collect() |
Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. |
count() |
Return the number of elements in the dataset. |
first() |
Return the first element of the dataset (similar to take(1)). |
take(n) |
Return an array with the first n elements of the dataset. |
takeSample(withReplacement, num, [seed]) |
Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed. |
takeOrdered(n, [ordering]) |
Return the first n elements of the RDD using either their natural order or a custom comparator. |
3、Transformation和Action的區別
- Tranformation的輸入輸出都是RDD;Action的輸入是RDD,輸出是值
-
Transformation是Lazy計算,Tra nsformation只會記錄RDD轉化關系
並不會觸發計算;Action是立即執行的
所以代碼中盡管有Transformation,但是不會立即執行,只會在Action時觸發Transformation的代碼。執行流程:
詳情查看:http://spark.apache.org/docs/2.0.2/programming-guide.html#transformations
4、Persistence
主要就是進行數據持久化,它與Transformation一樣不會立即執行:
- cache方法是緩存到內存中
cache()方法調用的也是persist方法,緩存策略均為MEMORY_ONLY。
- persist方法支持更靈活的緩存策略
persist方法手工設定StorageLevel來滿足工程需要的存儲級別
下面列出的是存儲級別:
Storage Level |
Meaning |
MEMORY_ONLY |
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.. |
MEMORY_AND_DISK |
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed. |
MEMORY_ONLY_SER |
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read. |
MEMORY_AND_DISK_SER |
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed. |
DISK_ONLY |
Store the RDD partitions only on disk. |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. |
Same as the levels above, but replicate each partition on two cluster nodes. |
OFF_HEAP (experimental) |
Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled. |
5、實例
我們可以通過wordcount來體會以下具體怎么來使用這種方式:
(1)准備測試文件

Preface “The Forsyte Saga” was the title originally destined for that part of it which is called “The Man of Property”; and to adopt it for the collected chronicles of the Forsyte family has indulged the Forsytean tenacity that is in all of us. The word Saga might be objected to on the ground that it connotes the heroic and that there is little heroism in these pages. But it is used with a suitable irony; and, after all, this long tale, though it may deal with folk in frock coats, furbelows, and a gilt-edged period, is not devoid of the essential heat of conflict. Discounting for the gigantic stature and blood-thirstiness of old days, as they have come down to us in fairy-tale and legend, the folk of the old Sagas were Forsytes, assuredly, in their possessive instincts, and as little proof against the inroads of beauty and passion as Swithin, Soames, or even Young Jolyon. And if heroic figures, in days that never were, seem to startle out from their surroundings in fashion unbecoming to a Forsyte of the Victorian era, we may be sure that tribal instinct was even then the prime force, and that “family” and the sense of home and property counted as they do to this day, for all the recent efforts to “talk them out.” So many people have written and claimed that their families were the originals of the Forsytes that one has been almost encouraged to believe in the typicality of an imagined species. Manners change and modes evolve, and “Timothy’s on the Bayswater Road” becomes a nest of the unbelievable in all except essentials; we shall not look upon its like again, nor perhaps on such a one as James or Old Jolyon. And yet the figures of Insurance Societies and the utterances of Judges reassure us daily that our earthly paradise is still a rich preserve, where the wild raiders, Beauty and Passion, come stealing in, filching security from beneath our noses. As surely as a dog will bark at a brass band, so will the essential Soames in human nature ever rise up uneasily against the dissolution which hovers round the folds of ownership. “Let the dead Past bury its dead” would be a better saying if the Past ever died. The persistence of the Past is one of those tragi-comic blessings which each new age denies, coming cocksure on to the stage to mouth its claim to a perfect novelty. But no Age is so new as that! Human Nature, under its changing pretensions and clothes, is and ever will be very much of a Forsyte, and might, after all, be a much worse animal. Looking back on the Victorian era, whose ripeness, decline, and ‘fall-of’ is in some sort pictured in “The Forsyte Saga,” we see now that we have but jumped out of a frying-pan into a fire. It would be difficult to substantiate a claim that the case of England was better in 1913 than it was in 1886, when the Forsytes assembled at Old Jolyon’s to celebrate the engagement of June to Philip Bosinney. And in 1920, when again the clan gathered to bless the marriage of Fleur with Michael Mont, the state of England is as surely too molten and bankrupt as in the eighties it was too congealed and low-percented. If these chronicles had been a really scientific study of transition one would have dwelt probably on such factors as the invention of bicycle, motor-car, and flying-machine; the arrival of a cheap Press; the decline of country life and increase of the towns; the birth of the Cinema. Men are, in fact, quite unable to control their own inventions; they at best develop adaptability to the new conditions those inventions create.
(2)編寫代碼
>>> sc = spark.sparkContext >>> rdd1 = sc.textFile('I:\spark_file\test.txt') #Transformation操作,只是記錄了動作,並沒有執行 >>> wordsRDD = rdd1.flatMap(lambda x:x.split(' ')).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y) #Action操作,觸發了Transformation操作 >>> wordsRDD.collect()
最后執行的結果:

('fenced', 1), ('sharp', 7), ('costs.', 1), ('state;', 1), ('Taking', ), ('staring,', 1), ('doctors', 1), ('employment', 3), ('white-bearded', 1), (enniless', 1), ('Forsyteism.', 1), ('random', 1), ('singers!', 1), ('tastes.',), ('good!’', 1), ('egg', 1), ('Bentham,', 3), ('naturally', 6), ('stream!', 1), ('horrid!”', 1), ('other.', 11), ('nightshirt,', 1), ('judgment', 11), ('slihtest', 2), ('chapel,', 1), ('cages!', 1), ('nineteen', 1), ('grass-plot,', 1),('Testament', 1), ('betrayal', 1), ('nerve,', 1), ('together;', 4), ('scene!',), ('exceedingly', 1), ('compunctious.', 1), ('Haven’t', 2), ('”', 34), ('poery;', 1), ('thinkable.', 1), ('Phil’s', 1), ('floors', 1), ('kinds', 1), ('arrsted', 1), ('Fresh', 1), ('lump', 1), ('purse,', 2), ('inarticulate', 1), ('witstand;', 1),...
另外可以通過SaveAsTextFile將其存儲在本地文件中。