目錄
引例入門:textFile、collect、filter、first、persist、count
創建RDD的方式:parallelize、textFile
轉化操作:map、filter、flatMap、sample、union、intersection、subtract、cartesian、distinct、groupByKey、reduceByKey、sortByKey、join、cogroup
行動操作:reduce、collect、count、first、take、takeSample、takeOrdered、saveAsTextFile、saveAsSequenceFile、countByKey、foreach
簡單的wordCount程序
引例入門textFile、collect、filter、first、persist、count |
# -*-coding=utf-8 -*- from pyspark import SparkConf, SparkContext sc = SparkContext('local') lines = sc.textFile("README.md") #使用textFile()創建一個字符串RDD。lines是一個字符串RDD(彈性分布式數據集)。文件中的每一行是其中的一個元素 print type(lines) #<class 'pyspark.rdd.RDD'> print lines.collect()[0] #collect()從RDD轉換到List 打印第一個元素:# Apache Spark pythonLines = lines.filter(lambda line: "Python" in line) #filter是對RDD元素進行過濾;返回一個新的數據集,由經過func函數后返回值為true的元素組成 for i in pythonLines.collect(): print i ''' high-level APIs in Scala, Java, Python, and R, and an optimized engine that ## Interactive Python Shell Alternatively, if you prefer Python, you can use the Python shell: ''' print type(pythonLines.collect()) #<type 'list'> print pythonLines.first() #獲取第一行:high-level APIs in Scala, Java, Python, and R, and an optimized engine that pythonLines.persist() #如果pythonLines被下游程序反復使用,將pythonLines持久化到內存可以提高計算效率 print pythonLines.count() #求RDD中元素個數。輸出:3
創建RDD的方式:parallelize、textFile |
# -*-coding=utf-8 -*- from pyspark import SparkConf, SparkContext sc = SparkContext('local') print sc.parallelize([1, 2, 3, 4]).collect() #[1, 2, 3, 4] print sc.textFile("README.md").collect() #[u'# Apache Spark', u'', u'Spark is a fast and general cluster computing system for Big Data. It provides'
轉化操作:map、filter、flatMap、sample、union、intersection、subtract、 cartesian、distinct、groupByKey、reduceByKey、sortByKey、join、cogroup、mapValues |
# -*-coding=utf-8 -*- from pyspark import SparkConf, SparkContext sc = SparkContext('local') #map(func):對RDD中的每個元素都執行一個指定的函數產生一個新的RDD。RDD之間的元素是一對一關系 print sc.parallelize([1, 2, 3, 4]).map(lambda x: x * x).collect() #[1, 4, 9, 16] print sc.parallelize(["hello world", "hi"]).map(lambda line: line.split(" ")).collect() #[['hello', 'world'], ['hi']] #filter(func):是對RDD元素進行過濾;返回一個新的數據集,由經過func函數后返回值為true的元素組成 print sc.parallelize([1, 2, 3, 4]).filter(lambda x: x>2).collect() #[3, 4] #flatMap(func):類似於map,但是輸出結果會被“拍扁” print sc.parallelize(["hello world", "hi"]).flatMap(lambda line: line.split(" ")).collect() #['hello', 'world', 'hi'] #sample(withReplacement,fraction,seed)根據給定的隨機種子seed,隨機抽樣出fraction比例的數據,withReplacement:是否放回抽樣 print sc.parallelize([1, 2, 3, 4, 5, 6, 7]).sample(True,0.2,1).collect() #[1, 7, 7] #union(RDD):RDD取並集 print sc.parallelize([1, 2, 3]).union(sc.parallelize([3, 4, 5])).collect() #[1, 2, 3, 3, 4, 5] #intersection(RDD):RDD取交集 print sc.parallelize([1, 2, 3]).intersection(sc.parallelize([3, 4, 5])).collect() #[3] #subtract(RDD):差集 print sc.parallelize([1, 2, 3]).subtract(sc.parallelize([3, 4, 5])).collect() #[2, 1] #cartesian(RDD):笛卡爾乘積,作用於數據集T和U上,返回(T, U),即數據集中每個元素的兩兩組合 print sc.parallelize([1, 2, 3]).cartesian(sc.parallelize([3, 4, 5])).collect() #[(1, 3), (1, 4), (1, 5), (2, 3), (2, 4), (2, 5), (3, 3), (3, 4), (3, 5)] #distinct():RDD去重 print sc.parallelize([1, 2, 3, 3]).distinct().collect() #[1, 2, 3] #groupByKey():作用於由鍵值對(K, V)組成的數據集上,將Key相同的數據放在一起,返回一個由鍵值對(K, Iterable)組成的數據集 a = sc.parallelize({(1,2),(3,4),(3,6)}).groupByKey().collect() #[(1, Iterable), (3, Iterable)] for i in a: print str(i[0])+":"+str(list(i[1])) #1:[2] ; 3:[4, 6] #reduceByKey():作用於鍵值對(K, V)上,按Key分組,然后將Key相同的鍵值對的Value都執行func操作,得到一個值 print sc.parallelize({(1,2),(3,4),(3,6)}).reduceByKey(lambda x,y: x+y).collect() #[(1, 2), (3, 10)] #sortByKey([ascending=True], [numTasks]):按照Key進行排序,ascending的值默認為True,True/False表示升序還是降序 print sc.parallelize({(2,2),(1,4),(3,6)}).sortByKey().collect() #[(1, 4), (2, 2), (3, 6)] #join(otherDataset, [numTasks]):類似於SQL中的連接操作,即作用於鍵值對(K, V)和(K, W)上,返回元組 (K, (V, W)),spark也支持外連接,包括leftOuterJoin,rightOuterJoin和fullOuterJoin。例子: print sc.parallelize({(1,2),(3,4),(3,6)}).join(sc.parallelize({(3,7),(4,8)})).collect() #[(3, (4, 7)), (3, (6, 7))] print sc.parallelize({(1,2),(3,4),(3,6)}).leftOuterJoin(sc.parallelize({(3,7),(4,8)})).collect() #[(1, (2, None)), (3, (4, 7)), (3, (6, 7))] print sc.parallelize({(1,2),(3,4),(3,6)}).rightOuterJoin(sc.parallelize({(3,7),(4,8)})).collect() #[(4, (None, 8)), (3, (4, 7)), (3, (6, 7))] print sc.parallelize({(1,2),(3,4),(3,6)}).fullOuterJoin(sc.parallelize({(3,7),(4,8)})).collect() #[(4, (None, 8)), (1, (2, None)), (3, (4, 7)), (3, (6, 7))] #cogroup(otherDataset, [numTasks]):作用於鍵值對(K, V)和(K, W)上,返回元組 (K, (Iterable, Iterable))。這一操作可叫做groupWith。 a = sc.parallelize({(1,2),(3,4),(3,6)}).cogroup(sc.parallelize({(3,7),(4,8)})).collect() #[(4, (Iterable, Iterable)), (1, (Iterable, Iterable)), (3, (Iterable, Iterable))] for i in a: print str(i[0])+":"+str(list(i[1][0]))+","+str(list(i[1][1])) #4:[],[8] ; 1:[2],[] ;3:[4, 6],[7] #mapValues(func): 擴展值 print sc.parallelize({("panda",0),("pink",3)}).mapValues(lambda x:(x,1)).collect() #[('pink', (3, 1)), ('panda', (0, 1))]
行動操作:reduce、collect、count、first、take、takeSample、takeOrdered、 saveAsTextFile、saveAsSequenceFile、countByKey、foreach、countByValue |
# -*-coding=utf-8 -*- from pyspark import SparkConf, SparkContext sc = SparkContext('local') #reduce(func):對數據集的所有元素執行func函數 print sc.parallelize([1, 2, 3, 3]).reduce(lambda x,y:x+y) #9 #collect():對數據集轉化為List print sc.parallelize([1, 2, 3, 3]).collect() #[1, 2, 3, 3] #count(): 返回數據集元素個數 print sc.parallelize([1, 2, 3, 3]).count() #4 #first(): 返回數據集中第一個元素 print sc.parallelize([1, 2, 3, 3]).first() #4 #take(n): 返回數據集中前n個元素元素 print sc.parallelize([1, 2, 3, 3]).take(2) #4 #takeSample(withReplacementnum,[seed])返回包含隨機的num個元素的數組,和sample不同,takeSample 是行動操作,所以返回的是數組而不是RDD,其中第一個參數withReplacement是抽樣時是否放回,第二個參數num會精確指定抽樣數,而不是比例。 print sc.parallelize([1, 2, 3, 3]).takeSample(True,2) #[1, 3] #takeOrdered(n,[ordering]): 按自然順序或者指定的排序規則返回前n個元素 print sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) #[1, 2, 3, 4, 5, 6] print sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6, key=lambda x: -x) #[10, 9, 7, 6, 5, 4] print sc.parallelize([(10, 1), (2, 9), (3, 4), (5, 6)]).takeOrdered(6, key=lambda x: -x[1]) #[(2, 9), (5, 6), (3, 4), (10, 1)]按第二個的降序排序 #saveAsTextFile(path): 保存為TextFile文件 sc.parallelize([1, 2, 3, 3]).saveAsTextFile("a") #[(2, 9), (5, 6), (3, 4), (10, 1)]按第二個的降序排序 #saveAsSequenceFile(path): 保存為SequenceFile文件 sc.parallelize({("11","2"),("103","4"),("103","6")}).saveAsSequenceFile("b") #[(2, 9), (5, 6), (3, 4), (10, 1)]按第二個的降序排序 #countByKey(): 對於(K,V)類型的RDD。返回一個(K,Int)的字典,Int為K的個數 print sc.parallelize([("a", 1), ("b", 1), ("a", 1)]).countByKey() #defaultdict(<type 'int'>, {'a': 2, 'b': 1}) #foreach(func): 對數據集中的每個元素執行func函數 def f(x): print(x*2) sc.parallelize([1, 2, 3, 3]).foreach(f) #2,4,6,6 #countByValue(): 根據值計數 print sc.parallelize([1, 2, 1, 2, 2]).countByValue().items() #[(1, 2), (2, 3)]
簡單的wordCount程序 |
# -*-coding=utf-8 -*- from pyspark import SparkConf, SparkContext sc = SparkContext('local') #方法一: print sc.parallelize(["hello","hi","hello"]).flatMap(lambda x: x.split(" ")).collect() #['hello', 'hi', 'hello'] print sc.parallelize(["hello","hi","hello"]).flatMap(lambda x: x.split(" ")).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collect() #[('hi', 1), ('hello', 2)] #方法二: print sc.parallelize(["hello","hi","hello"]).flatMap(lambda x: x.split(" ")).collect() #['hello', 'hi', 'hello'] print sc.parallelize(["hello","hi","hello"]).flatMap(lambda x: x.split(" ")).countByValue() #defaultdict(<type 'int'>, {'hi': 1, 'hello': 2})