Spark中RDD的常用操作(Python)


彈性分布式數據集(RDD)

        Spark是以RDD概念為中心運行的。RDD是一個容錯的、可以被並行操作的元素集合。創建一個RDD有兩個方法:在你的驅動程序中並行化一個已經存在的集合;從外部存儲系統中引用一個數據集。RDD的一大特性是分布式存儲,分布式存儲在最大的好處是可以讓數據在不同工作節點並行存儲,以便在需要數據時並行運算。彈性指其在節點存儲時,既可以使用內存,也可已使用外存,為使用者進行大數據處理提供方便。除此之外,RDD的另一大特性是延遲計算,即一個完整的RDD運行任務被分為兩部分:Transformation和Action

1.Transformation

Transformation用於對RDD的創建,RDD只能使用Transformation創建,同時還提供大量操作方法,包括map,filter,groupBy,join等,RDD利用這些操作生成新的RDD,但是需要注意,無論多少次Transformation,在RDD中真正數據計算Action之前都不可能真正運行。

2.Action

Action是數據執行部分,其通過執行count,reduce,collect等方法真正執行數據的計算部分。實際上,RDD中所有的操作都是Lazy模式進行,運行在編譯中不會立即計算最終結果,而是記住所有操作步驟和方法,只有顯示的遇到啟動命令才執行。這樣做的好處在於大部分前期工作在Transformation時已經完成,當Action工作時,只需要利用全部自由完成業務的核心工作。

 

下面是在python中對RDD的生成,以及一些基本的Transformation,Action操作。

 
         
# -*- coding:utf-8 -*-
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import math
appName ="jhl_spark_1" #你的應用程序名稱
master= "local"#設置單機
conf = SparkConf().setAppName(appName).setMaster(master)#配置SparkContext
sc = SparkContext(conf=conf)

# parallelize:並行化數據,轉化為RDD
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data, numSlices=10)  # numSlices為分塊數目,根據集群數進行分塊

# textFile讀取外部數據
rdd = sc.textFile("./c2.txt")  # 以行為單位讀取外部文件,並轉化為RDD
print rdd.collect()

# map:迭代,對數據集中數據進行單獨操作
def my_add(l):
    return (l,l)
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)  # 並行化數據集
result = distData.map(my_add)
print (result.collect())  # 返回一個分布數據集


# filter:過濾數據
def my_add(l):
    result = False
    if l > 2:
        result = True
    return result
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)#並行化數據集,分片
result = distData.filter(my_add)
print (result.collect())#返回一個分布數據集

# zip:將兩個RDD對應元素組合為元組
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))
print x.zip(y).collect()





#union 組合兩個RDD
print x.union(x).collect()
# Aciton操作

# collect:返回RDD中的數據
rdd = sc.parallelize(range(1, 10))
print rdd
print rdd.collect()

# collectAsMap:以rdd元素為元組,以元組中一個元素作為索引返回RDD中的數據
m = sc.parallelize([('a', 2), (3, 4)]).collectAsMap()
print m['a']
print m[3]

# groupby函數:根據提供的方法為RDD分組:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
def fun(i):
    return i % 2
result = rdd.groupBy(fun).collect()
print [(x, sorted(y)) for (x, y) in result]

# reduce:對數據集進行運算
rdd = sc.parallelize(range(1, 10))
result = rdd.reduce(lambda a, b: a + b)
print result

  

 除上述以外,對RDD還存在一些常見數據操作如:

name()返回rdd的名稱

min()返回rdd中的最小值

sum()疊加rdd中所有元素

take(n)取rdd中前n個元素

count()返回rdd的元素個數

 

更多操作請參考 :http://spark.apache.org/docs/latest/api/python/index.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM