彈性分布式數據集(RDD)
Spark是以RDD概念為中心運行的。RDD是一個容錯的、可以被並行操作的元素集合。創建一個RDD有兩個方法:在你的驅動程序中並行化一個已經存在的集合;從外部存儲系統中引用一個數據集。RDD的一大特性是分布式存儲,分布式存儲在最大的好處是可以讓數據在不同工作節點並行存儲,以便在需要數據時並行運算。彈性指其在節點存儲時,既可以使用內存,也可已使用外存,為使用者進行大數據處理提供方便。除此之外,RDD的另一大特性是延遲計算,即一個完整的RDD運行任務被分為兩部分:Transformation和Action
1.Transformation
Transformation用於對RDD的創建,RDD只能使用Transformation創建,同時還提供大量操作方法,包括map,filter,groupBy,join等,RDD利用這些操作生成新的RDD,但是需要注意,無論多少次Transformation,在RDD中真正數據計算Action之前都不可能真正運行。
2.Action
Action是數據執行部分,其通過執行count,reduce,collect等方法真正執行數據的計算部分。實際上,RDD中所有的操作都是Lazy模式進行,運行在編譯中不會立即計算最終結果,而是記住所有操作步驟和方法,只有顯示的遇到啟動命令才執行。這樣做的好處在於大部分前期工作在Transformation時已經完成,當Action工作時,只需要利用全部自由完成業務的核心工作。
下面是在python中對RDD的生成,以及一些基本的Transformation,Action操作。
# -*- coding:utf-8 -*- from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext import math appName ="jhl_spark_1" #你的應用程序名稱 master= "local"#設置單機 conf = SparkConf().setAppName(appName).setMaster(master)#配置SparkContext sc = SparkContext(conf=conf) # parallelize:並行化數據,轉化為RDD data = [1, 2, 3, 4, 5] distData = sc.parallelize(data, numSlices=10) # numSlices為分塊數目,根據集群數進行分塊 # textFile讀取外部數據 rdd = sc.textFile("./c2.txt") # 以行為單位讀取外部文件,並轉化為RDD print rdd.collect() # map:迭代,對數據集中數據進行單獨操作 def my_add(l): return (l,l) data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) # 並行化數據集 result = distData.map(my_add) print (result.collect()) # 返回一個分布數據集 # filter:過濾數據 def my_add(l): result = False if l > 2: result = True return result data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)#並行化數據集,分片 result = distData.filter(my_add) print (result.collect())#返回一個分布數據集 # zip:將兩個RDD對應元素組合為元組 x = sc.parallelize(range(0,5)) y = sc.parallelize(range(1000, 1005)) print x.zip(y).collect() #union 組合兩個RDD print x.union(x).collect() # Aciton操作 # collect:返回RDD中的數據 rdd = sc.parallelize(range(1, 10)) print rdd print rdd.collect() # collectAsMap:以rdd元素為元組,以元組中一個元素作為索引返回RDD中的數據 m = sc.parallelize([('a', 2), (3, 4)]).collectAsMap() print m['a'] print m[3] # groupby函數:根據提供的方法為RDD分組: rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) def fun(i): return i % 2 result = rdd.groupBy(fun).collect() print [(x, sorted(y)) for (x, y) in result] # reduce:對數據集進行運算 rdd = sc.parallelize(range(1, 10)) result = rdd.reduce(lambda a, b: a + b) print result
除上述以外,對RDD還存在一些常見數據操作如:
name()返回rdd的名稱
min()返回rdd中的最小值
sum()疊加rdd中所有元素
take(n)取rdd中前n個元素
count()返回rdd的元素個數
更多操作請參考 :http://spark.apache.org/docs/latest/api/python/index.html