spark教程(四)-SparkContext 和 RDD 算子


SparkContext

SparkContext 是在 spark 庫中定義的一個類,作為 spark 庫的入口點;

它表示連接到 spark,在進行 spark 操作之前必須先創建一個 SparkContext 的實例,並且只能創建一個

利用 SparkContext 實例創建的對象都是 RDD,這是相對於 SparkSession 說的,因為 它創建的對象都是 DataFrame;

 

創建 sc

class SparkContext(__builtin__.object):
    def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<cl
    ass 'pyspark.profiler.BasicProfiler'>)
    '''Create a new SparkContext. At least the master and app name should be set,
     |      either through the named parameters here or through C{conf}.
     |
     |      :param master: Cluster URL to connect to
     |             (e.g. mesos://host:port, spark://host:port, local[4]).   local 表示本地運行,4 表示使用4個 cpu核
     |      :param appName: A name for your job, to display on the cluster web UI.
     |      :param sparkHome: Location where Spark is installed on cluster nodes.
     |      :param pyFiles: Collection of .zip or .py files to send to the cluster
     |             and add to PYTHONPATH.  These can be paths on the local file
     |             system or HDFS, HTTP, HTTPS, or FTP URLs.
     |      :param environment: A dictionary of environment variables to set on
     |             worker nodes.
     |      :param batchSize: The number of Python objects represented as a single
     |             Java object. Set 1 to disable batching, 0 to automatically choose
     |             the batch size based on object sizes, or -1 to use an unlimited
     |             batch size
     |      :param serializer: The serializer for RDDs.
     |      :param conf: A L{SparkConf} object setting Spark properties.
     |      :param gateway: Use an existing gateway and JVM, otherwise a new JVM
     |             will be instantiated.
     |      :param jsc: The JavaSparkContext instance (optional).
     |      :param profiler_cls: A class of custom Profiler used to do profiling
     |             (default is pyspark.profiler.BasicProfiler).
     |
     |
     |      >>> from pyspark.context import SparkContext
     |      >>> sc = SparkContext('local', 'test')
     |
     |      >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL
     |      Traceback (most recent call last):
     |          ...
     |      ValueError:...'''

示例代碼

from pyspark import SparkContext, SparkConf
### method 1
conf = SparkConf().setAppName('myapp1').setMaster('local[4]')   # 設定 appname 和 master
sc = SparkContext(conf=conf)

## method 2
sc = SparkContext("spark://hadoop10:7077")

 

創建 RDD

spark 最重要的一個概念叫 RDD,Resilient Distributed Dataset,彈性分布式數據集

spark 是以 RDD 概念為中心運行的,RDD 是一個容錯的、可以被並行操作的元素集合。

創建 RDD 有三種方式:

1. 在驅動程序中並行化一個已經存在的集合    【內存中的數據】

2. 從外部存儲系統引入數據,生成 RDD      【外部存儲介質中的數據,注意 spark 本身沒有存儲功能】

  // 這個存儲系統可以是一個共享文件系統,如 hdfs、hbase

3. 從一種 RDD 轉換成 另一種 RDD

詳見我的博客 RDD 認知

 

操作 RDD

RDD 的操作有兩種方式:轉換 和 行動,而且 轉換 是 惰性的

可以根據 是否有返回 判斷是哪個操作,行動 有返回值,轉換無返回值

詳見官網 RDD

RDD 的操作也叫 RDD 算子 

 

轉換 算子

惰性,無返回值

map(func[, preservesPartitioning=False]):把一個序列中的元素逐個送入 map,經 func 處理后,返回一個新的 序列

rdd = sc.parallelize([2, 3, 4])
rdd.map(lambda x: x + 1).collect()          # [3, 4, 5]

filter(func):類似 map,func 是個過濾函數

rdd = sc.parallelize([2, 3, 4])
rdd.map(lambda x: x > 3).collect()          # [False, False, True]

flatMap(func[, preservesPartitioning=False]):也類似 map,只是 它會把 每次經過 func 處理的結果進行 合並,輸入和輸出的 list 長度可能不同

rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()            # [1, 1, 2, 1, 2, 3]
# range(1, 2): 1
# range(1, 3): 1, 2
# range(1, 4): 1, 2, 3

### vs map
rdd.map(lambda x: range(1, x)).collect()                # [[1], [1, 2], [1, 2, 3]]

mapPartitions(func [, preservesPartitioning=False]) :map的一個變種,map 是把序列的單個元素送入 func ,而 mapPartitions 是把 序列分區后 每個 分區 整體送入 func

rdd = sc.parallelize([1,2,3,4,5], 3)    # 分 3 個區
def f(iterator): yield sum(iterator)    # 必須是生成器,即 yield,不能 return
rdd.mapPartitions(f).collect()          # [1, 5, 9]

mapPartitionsWithIndex(func [, preservesPartitioning=False]) :func 有兩個參數,分片的序號 和 迭代器,返回 分片序號,也必須是 迭代器

rdd = sc.parallelize(range(15), 13)    # 分 13 個區
def f(splitIndex, iterator): yield splitIndex
rdd.mapPartitionsWithIndex(f).collect() # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

cartesian(otherDataset):利用兩個 序列 生成 笛卡爾內積 的數據集

x = sc.parallelize([1,2,3])
y = sc.parallelize([4,5])
x.cartesian(y).collect()        # [(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]

groupBy(self, f, numPartitions=None, partitionFunc=portable_hash):輸入一個 func,每個元素都用經過 func 處理,並對處理后的結果自動分組

## 條件是分組依據,條件不影響最后的輸出格式,輸出格式仍和原數據相同
## 如 原來是 [1, 2],經過分組后分到了 第 1 組,輸出是 [1, [1, 2]], [1, 2] 完全保留

# 這個例子相當於求 奇偶數
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x % 2).collect()     # [(0, <pyspark.resultiterable.ResultIterable object at 0x7f2f76096890>),
                                                    # (1, <pyspark.resultiterable.ResultIterable object at 0x7f2f760965d0>)]
# 解析迭代器並排序
sorted([(x, sorted(y)) for (x, y) in result])       # [(0, [2, 8]), (1, [1, 1, 3, 5])]

keyBy(self, f):利於 func 為 rdd 中每個元素設置一個 key

x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x).collect()
# [(0, 0), (1, 1), (4, 2)]

sortBy(self, keyfunc, ascending=True, numPartitions=None):根據指定函數進行排序

tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
# [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

cogroup(self, other, numPartitions=None):把 kv 對按 k 進行收集

x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
[('a', ([1], [2])), ('b', ([4], []))]

 

以下方法只適用 key-value 數據

keys(self),values(self)

m = sc.parallelize([(1, 2), (3, 4)]).keys()
m.collect()    # [1, 3]

mapValues(func):根據 func 處理 value

rdd = sc.parallelize([(1, [1,2,3]), (3, ['a', 'b'])])
rdd.mapValues(len).collect()            # [(1, 3), (3, 2)]  計算 value 的長度

reduceByKey(func [, numPartitions=None, partitionFunc=<function portable_hash at 0x7fa664f3cb90>]):針對 k-v 對的處理方法,把 key 相同的 value 進行 reduce,然后重新組成 key-reduce 對

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
def f(x, y): return x + y
rdd.reduceByKey(f).collect()        # [('a', 2), ('b', 1)]

groupByKey(self, numPartitions=None, partitionFunc=portable_hash):根據 key 進行分組

 rdd.groupByKey().collect()
# [('a', <pyspark.resultiterable.ResultIterable object at 0x7f16e3c0bd90>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7f16e3c0bcd0>)]
sorted(rdd.groupByKey().mapValues(len).collect())
# [('a', 2), ('b', 1)]
sorted(rdd.groupByKey().mapValues(list).collect())
# [('a', [1, 1]), ('b', [1])]

sortByKey([ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7fa665048c80>]):根據 key 進行排序,默認升序,numPartitions 代表分區數,keyfunc 是處理 key 的,在 排序過程中對 key 進行處理

tmp = [('a', 4), ('b', 3), ('c', 2), ('D', 1)]
sc.parallelize(tmp).sortByKey(True, 1).collect()    # 升序[('D', 1), ('a', 4), ('b', 3), ('c', 2)] 1代表分區數
sc.parallelize(tmp).sortByKey(True, 2, keyfunc=lambda k:k.lower()).collect()   # 升序[('a', 4), ('b', 3), ('c', 2), ('D', 1)] D跑到后面了

sc.parallelize(tmp).sortByKey(False, 2, keyfunc=lambda k:k.lower()).collect()# 降序[('D', 1), ('c', 2), ('b', 3), ('a', 4)]

keyfunc 只在 排序過程中起作用,在輸出時 keyfunc 不起作用

join(otherDataset [, numPartitions=None]):將 兩個 k-v RDD 中 共有的 key 的 value 交叉組合

x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
x.join(y).collect()     # [('a', (1, 2)), ('a', (1, 3))]

join 是內連接,也就是共有的 key 進行組合;

還有 leftOuterJoin 左連接,rightOuterJoin 右連接,fullOuterJoin 全連接

groupWith(other, *others):把多個 RDD 的 key 進行分組;輸出 (key,迭代器)

分組后的數據是有順序的,每個 key 對應的 value 是按 原本 RDD 的順序的,如果原本 RDD 沒有這個 key,留空

w = sc.parallelize([("a", 5), ("b", 6)])
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
z = sc.parallelize([("b", 42)])
w.groupWith(x, y, z).collect()

[(x, tuple(map(list, y))) for x, y in list(w.groupWith(x, y, z).collect())]     # [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]

 

行動 算子

有返回值

count:返回 RDD 中元素個數

first:返回 RDD 中第一個元素

max. min.sum:不解釋

take(n):返回 RDD 中 前 n 個元素

collect:返回 RDD 中的數據

要注意的是,collect 相當於把 worker 的數據轉移給驅動程序來顯示,如果數據量過大,可能導致驅動程序崩潰

takeOrdered(n [, key=None]):對 RDD 先進行排序,然后取排序后的 前 n 個數據,key 表示先經過 keyfunc 處理后再進行排序,最終返回的還是原數據

sc.parallelize([9,7,3,2,6,4]).takeOrdered(3)    # [2, 3, 4]
sc.parallelize([9,7,3,2,6,4]).takeOrdered(3, key=lambda x: -x)   # [9, 7, 6]
## 過程如下
#  9,  7,  3,  2,  6,  4  ## 原數據
# -9, -7, -3, -2, -6, -4  ## 經過 keyfunc 處理后的數據
# -9, -7, -6, -4, -3, -2  ## 對處理后的數據升序排序
# -9, -7, -6              ## 取前3個
#  9,  7,  6              ## 對應到原數據

也就是說,keyfunc 只在排序時起作用,在輸出時不起作用

foreach(func):運行 func 函數 並行處理 RDD 的所有元素

與 map 的區別在於一個是 轉換算子,一個是行動算子    【曾經面試被問過這個問題,從功能上講兩個函數好像一樣的,當時沒答上,心碎】

sc.parallelize([1, 2, 3, 4, 5]).foreach(print)  # 並行打印,不按順序輸出
# 1
# 2
# 4
# 5
# 3

reduce(func):把 RDD 中前兩個元素送入 func,得到一個 value,把這個 value 和 下一個元素 送入 func,直至最后一個元素

sc.parallelize([1,2,3,4,5]).reduce(lambda x, y: x + y)  # 15 求和

fold:與 reduce 類似,fold 是有一個 基數,然后 把每個元素 和 基數 送入 func,然后替換該基數,循環,直到最后一個元素

x = sc.parallelize([1,2,3])
neutral_zero_value = 0  # 0 for sum, 1 for multiplication
y = x.fold(neutral_zero_value, lambda obj, accumulated: accumulated + obj) # computes cumulative sum
print(x.collect())  # [1,2,3]
print(y)            # 6

aggregate:對每個分區進行聚合,然后聚合每個分區的聚合結果,詳見我的博客 aggregate

countByValue:統計相同元素的個數

sc.parallelize([1,2,3,1,2,5,3,2,3,2]).countByValue().items()    # [(1, 2), (2, 4), (3, 3), (5, 1)]

# 輸入 k-v 不按 value 統計,按 k-v 統計
sc.parallelize([('a', 1), ('b', 1)]).countByValue().items()     # [(('a', 1), 1), (('b', 1), 1)]

saveAsTextFile(path [, compressionCodecClass=None]):把 RDD 存儲到文件系統中

counts.saveAsTextFile('/usr/lib/spark/out')

輸入必須是 路徑,且該路徑不能事先存在

 

以下方法只適用 key-value 數據

countByKey:統計相同 key 的個數,返回 key-count 

sc.parallelize([("a",1), ("b",1), ("a", 3)]).countByKey()   # defaultdict(<type 'int'>, {'a': 2, 'b': 1})

dictdata= sc.parallelize([("a",1), ("b",1), ("a", 3)]).countByKey()
dictdata.items()        # [('a', 2), ('b', 1)]

lookup(self, key):輸入 key,返回對應的所有 value

rdd = sc.parallelize([('a', 1), ('b',3), ('a', 5)])
rdd.lookup('a')    # [1, 5]

 

RDD 緩存

把 RDD 緩存到 內存中, 惰性執行

distFile = sc.textFile('README.md')
m = distFile.map(lambda x: len(x))      # map 是 轉換 操作,並不立即執行
m.cache()       # 把 map 的輸出緩存到內存中,其實 cache 就是 執行 操作
# 或者 m.persist()

緩存除了存儲到內存中,還有一個非常重要的作用,復用以提高效率;

緩存的 RDD 不僅可以直接拿來轉換成新的 RDD,還可以多次利用;

我們知道 RDD 是有血緣關系的,即一個 RDD 由另一個 RDD 轉換得來,而這種關系可能是多層的;

 

 

如果一個 RDD 緩存了,spark 會執行到目前為止所有轉換操作,並為生成的 RDD 創建一個檢查點,

但是由於 緩存 是惰性操作,緩存只會在第一次 行動 操作后創建,且第一次 行動 操作不受益,第二次三次直接調用緩存才受益;

緩存適合多次使用的數據,只用一次的無需緩存;

緩存適合大數據,小數據無需緩存

 

persist(self, storageLevel=StorageLevel.MEMORY_ONLY):這個方法加入了 存儲等級,可以把 RDD 緩存在內存或者磁盤上,參數可選,如果沒有,等於 cache,存於內存中

參數取值: MEMORY_ONLY、DISK_ONLY、MEMORY_AND_DISK 等

 

RDD 緩存可容錯,即有備份的;

 

RDD 保存

保存到文件等存儲介質中,行動操作

saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None)

saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None)

saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None)

saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None)

saveAsSequenceFile(self, path, compressionCodecClass=None)

saveAsPickleFile(self, path, batchSize=10)

saveAsTextFile(self, path, compressionCodecClass=None)

 

 

參考資料:

https://www.cnblogs.com/yangzhang-home/p/6056133.html  快速入門

https://blog.csdn.net/kl28978113/article/details/80361452  較全教程

http://spark.apache.org/docs/latest/    spark 2.4.4 官網

http://spark.apache.org/docs/latest/api/python/index.html    spark 2.4.4 python API

https://www.cnblogs.com/Vito2008/p/5216324.html

https://blog.csdn.net/proplume/article/details/79798289

https://www.iteblog.com/archives/1396.html#aggregate  RDD 操作 API

https://www.cnblogs.com/yxpblog/p/5269314.html    RDD 操作 API


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM