1. map(func)
將func函數作用到數據集的每個元素,生成一個新的分布式的數據集並返回
1 >>> a = sc.parallelize(('a', 'b', 'c')) 2 >>> a.map(lambda x: x+'1').collect() 3 ['a1', 'b1', 'c1']
2. filter(func)
選出所有func返回值為true的元素,作為一個新的數據集返回
1 >>> a = sc.parallelize(range(10)) 2 >>> a.filter(lambda x: x%2==0).collect() # 選出0-9的偶數 3 [0, 2, 4, 6, 8]
3. flatMap(func)
與map相似,但是每個輸入的item能夠被map到0個或者更多的items輸出,也就是說func的返回值應當是一個Sequence,而不是一個單獨的item
1 >>> l = ['I am Tom', 'She is Jenny', 'He is Ben'] 2 >>> a = sc.parallelize(l,3) 3 >>> a.flatMap(lambda line: line.split()).collect() # 將每個字符串中的單詞划分出來 4 ['I', 'am', 'Tom', 'She', 'is', 'Jenny', 'He', 'is', 'Ben']
4. mapPartitions(func)
與map相似,但是mapPartitions的輸入函數單獨作用於RDD的每個分區(block)上,因此func的輸入和返回值都必須是迭代器iterator。
例如:假設RDD有十個元素0~9,分成三個區,使用mapPartitions返回每個元素的平方。如果使用map方法,map中的輸入函數會被調用10次,而使用mapPartitions方法,輸入函數只會被調用3次,每個分區被調用1次。
1 >>> def squareFunc(a): 2 . . . for i in a: 3 . . . yield i*i 4 . . . 5 >>> a = sc.parallelize(range(10), 3) 6 PythonRDD[1] at RDD at PythonRDD.scala:48 7 >>> a.mapPartitions(squareFunc).collect() 8 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
5. mapPartitionsWithIndex(func)
與mapPartitions相似,但是輸入函數func提供了一個正式的參數,可以用來表示分區的編號。
1 >>> def func(index, iterator): # 返回每個分區的編號和數值 2 . . . yield (‘index ‘ + str(index) + ’ is: ‘ + str(list(iterator))) 3 . . . 4 >>> a = sc.parallelize(range(10),3) 5 >>> a.mapPartitionsWithIndex(func).collect() 6 ['index 0 is: [0, 1, 2]', 'index 1 is: [3, 4, 5]', 'index 2 is: [6, 7, 8, 9]'] 7 >>> def squareIndex(index, iterator): # 返回每個數值所屬分區的編號和數值的平方 8 ... for i in iterator: 9 ... yield ("The index is: " + str(index) + ", and the square is: " + str(i*i)) 10 ... 11 >>> a.mapPartitionsWithIndex(squareIndex).collect() 12 ['The index is: 0, and the square is: 0',
'The index is: 0, and the square is: 1',
'The index is: 1, and the square is: 4',
'The index is: 1, and the square is: 9',
'The index is: 1, and the square is: 16',
'The index is: 2, and the square is: 25',
'The index is: 2, and the square is: 36',
'The index is: 3, and the square is: 49',
'The index is: 3, and the square is: 64',
'The index is: 3, and the square is: 81']
6. sample(withReplacement, fraction, seed)
從數據中抽樣,withReplacement表示是否有放回,withReplacement=true表示有放回抽樣,fraction為抽樣的概率(0<=fraction<=1),seed為隨機種子。
例如:從1-100之間抽取樣本,被抽取為樣本的概率為0.2
1 >>> data = sc.parallelize(range(1,101),2) 2 >>> sample = data.sample(True, 0.2) 3 >>> sampleData.count() 4 19 5 >>> sampleData.collect() 6 [16, 19, 24, 29, 32, 33, 44, 45, 55, 56, 56, 57, 65, 65, 73, 83, 84, 92, 96]
!!!注意,Spark中的sample抽樣,當withReplacement=True時,相當於采用的是泊松抽樣;當withReplacement=False時,相當於采用伯努利抽樣,fraction並不是表示抽樣得到的樣本占原來數據總量的百分比,而是一個元素被抽取為樣本的概率。fraction=0.2並不是說明要抽出100個數字中20%的數據作為樣本,而是每個數字被抽取為樣本的概率為0.2,這些數字被認為來自同一總體,樣本的大小並不是固定的,而是服從二項分布。
7. union(otherDataset)
並集操作,將源數據集與union中的輸入數據集取並集,默認保留重復元素(如果不保留重復元素,可以利用distinct操作去除,下邊介紹distinct時會介紹)。
1 >>> data1 = sc.parallelize(range(10)) 2 >>> data2 = sc.parallelize(range(6,15)) 3 >>> data1.union(data2).collect() 4 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 12, 13, 14]
8. intersection(otherDataset)
交集操作,將源數據集與union中的輸入數據集取交集,並返回新的數據集。
1 >>> data1 = sc.parallelize(range(10)) 2 >>> data2 = sc.parallelize(range(6,15)) 3 >>> data1.intersection(data2).collect() 4 [8, 9, 6, 7]
9. distinct([numTasks])
去除數據集中的重復元素。
1 >>> data1 = sc.parallelize(range(10)) 2 >>> data2 = sc.parallelize(range(6,15)) 3 >>> data1.union(data2).distinct().collect() 4 [0, 8, 1, 9, 2, 10, 11, 3, 12, 4, 5, 13, 14, 6, 7]
下邊的一系列transactions會用的鍵(Key)這一概念,在進行下列有關Key操作時使用的數據集為記錄倫敦各個片區(英文稱為ward)中學校和學生人數相關信息的表格,下載地址:
https://data.london.gov.uk/dataset/london-schools-atlas/resource/64f771ee-38b1-4eff-8cd2-e9ba31b90685#
下載后將其中命名為WardtoSecSchool_LDS_2015的sheet里邊的數據保存為csv格式,刪除第一行的表頭,並重新命名為school.csv
數據格式為:
(Ward_CODE, Ward_NAME, TotalWardPupils, Ward2Sec_Flow_No., Secondary_School_URN, Secondary_School_Name, Pupil_count)
首先對數據進行一些預處理:
1 >>> school = sc.textFile("file:///home/yang/下載/school.csv") 2 Data = sc.textFile("file:///home/yang/下載/school.csv") 3 >>> school.count() # 共有16796行數據 4 16796 5 >>> import re # 引入python的正則表達式包 6 >>> rows = school.map(lambda line: re.subn(',[\s]+',': ', line))
注意:1. 從本地讀取數據時,代碼中要通過 “file://” 前綴指定讀取本地文件。Spark shell 默認是讀取 HDFS 中的文件,需要先上傳文件到 HDFS 中,否則會有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/school.csv”的錯誤。
2. 對數據集進行了一下預處理,利用正則匹配替換字符串,由於一些學校的名字的字符串中本身含有逗號,比如“The City Academy, Hackney”, 此時如果利用csv的分隔符’,’進行分割,並不能將名字分割為“The City Academy”和“Hackney”。我們注意到csv的分隔符逗號后邊是沒有空格的,而名字里邊的逗號后邊都會有空格(英語書寫習慣),因此,先利用re.subn語句對逗號后邊含有至少一個空格(正則表達式為’,[\s]+’)的子字符串進行替換,替換為’: ’,然后再進行后續操作。以上即為對這一數據集的預處理過程。
10. groupByKey([numTasks])
作用於由鍵值對(K, V)組成的數據集上,將Key相同的數據放在一起,返回一個由鍵值對(K, Iterable)組成的數據集。
注意:1. 如果這一操作是為了后續在每個鍵上進行聚集(aggregation),比如sum或者average,此時使用reduceByKey或者aggregateByKey的效率更高。2. 默認情況下,輸出的並行程度取決於RDD分區的數量,但也可以通過給可選參數numTasks賦值來調整並發任務的數量。
1 >>> newRows = rows.map(lambda r: r[0].split(',')) 2 >>> ward_schoolname = newRows .map(lambda r: (r[1], r[5])).groupByKey() # r[1]為ward的名字,r[5]為學校的名字 3 >>> ward_schoolname.map(lambda x: {x[0]: list(x[1])}).collect() # 列出每個ward區域內所有的學校的名字 4 [{'Stifford Clays': ['William Edwards School', 'Brentwood County High School', "The Coopers' Company and Coborn School", 'Becket Keys Church of England Free School', ...]
# 輸出結果為在Stifford Clays這個ward里的學校有William Edwards School,Brentwood County High School,The Coopers' Company and Coborn School等等...
11. reduceByKey(func, [numTasks])
作用於鍵值對(K, V)上,按Key分組,然后將Key相同的鍵值對的Value都執行func操作,得到一個值,注意func的類型必須滿足
1 >>> pupils = newRows.map(lambda r: (r[1], int(r[6]))) # r[1]為ward的名字,r[6]為每個學校的學生數 2 >>> ward_pupils = pupils.reduceByKey(lambda x, y: x+y) # 計算各個ward中的學生數 3 >>> ward_pupils.collect() # 輸出各個ward中的學生數 4 [('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526),
('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835),
('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585),
('Camberwell Green', 1374), ('Glyndon', 4633),...]
12. aggregateByKey(zeroValue, seqOp, comOp, [numTasks])
在於鍵值對(K, V)的RDD中,按key將value進行分組合並,合並時,將每個value和初始值作為seqOp函數的參數,進行計算,返回的結果作為一個新的鍵值對(K, V),然后再將結果按照key進行合並,最后將每個分組的value傳遞給comOp函數進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給comOp函數,以此類推),將key與計算結果作為一個新的鍵值對(K, V)輸出。
例子: 上述統計ward內學生人數的操作也可以通過aggregateByKey實現,此時,seqOp和comOp都是進行加法操作,代碼如下:
1 >>> ward_pupils = pupils.aggregateByKey(0, lambda x, y: x+y, lambda x, y: x+y) 2 >>> ward_pupils.collect() 3 [('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526),
('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835),
('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585),
('Camberwell Green', 1374), ('Glyndon', 4633),...]
13. sortByKey([ascending=True], [numTasks])
按照Key進行排序,ascending的值默認為True,True/False表示升序還是降序
例如:將上述ward按照ward名字降序排列,打印出前十個
1 >>> ward_pupils.sortByKey(False, 4).take(10) 2 [('Yiewsley', 2560), ('Wormholt and White City', 1455), ('Woodside', 1204),
('Woodhouse', 2930), ('Woodcote', 1214), ('Winchmore Hill', 1116), ('Wilmington', 2243),
('Willesden Green', 1896), ('Whitefoot', 676), ('Whalebone', 2294)]
14. join(otherDataset, [numTasks])
類似於SQL中的連接操作,即作用於鍵值對(K, V)和(K, W)上,返回元組 (K, (V, W)),spark也支持外連接,包括leftOuterJoin,rightOuterJoin和fullOuterJoin。例子:
1 >>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended')) 2 >>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended')) 3 >>> class1.join(class2).collect() 4 [('Tom', ('attended', 'attended'))] 5 >>> class1.leftOuterJoin(class2).collect() 6 [('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None))] 7 >>> class1.rightOuterJoin(class2).collect() 8 [('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))] 9 >>> class1.fullOuterJoin(class2).collect() 10 [('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None)), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]
15. cogroup(otherDataset, [numTasks])
作用於鍵值對(K, V)和(K, W)上,返回元組 (K, (Iterable, Iterable))。這一操作可叫做groupWith。
1 >>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended')) 2 >>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended')) 3 >>> group = class1.cogroup(class2) 4 >>> group.collect() 5 [('John', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808afd0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a1d0>)),
('Tom', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a7f0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a048>)),
('Jenny', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a9b0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a208>)),
('Bob', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808ae80>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b448d0>)),
('Amy', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44c88>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44588>)),
('Alice', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44748>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44f98>))] 6 >>> group.map(lambda x: {x[0]: [list(x[1][0]), list(x[1][1])]}).collect() 7 [{'John': [[], ['attended']]}, {'Tom': [['attended'], ['attended']]}, {'Jenny': [['attended'], []]}, {'Bob': [['attended'], []]}, {'Amy': [[], ['attended']]}, {'Alice': [[], ['attended']]}]
16. cartesian(otherDataset)
笛卡爾乘積,作用於數據集T和U上,返回(T, U),即數據集中每個元素的兩兩組合
1 >>> a = sc.parallelize(('a', 'b', 'c')) 2 >>> b = sc.parallelize(('d', 'e', 'f')) 3 >>> a.cartesian(b).collect() 4 [('a', 'd'), ('a', 'e'), ('a', 'f'), ('b', 'd'), ('b', 'e'), ('b', 'f'), ('c', 'd'), ('c', 'e'), ('c', 'f')]
17. pipe(command, [envVars])
將驅動程序中的RDD交給shell處理(外部進程),例如Perl或bash腳本。RDD元素作為標准輸入傳給腳本,腳本處理之后的標准輸出會作為新的RDD返回給驅動程序。
18. coalesce(numPartitions)
將RDD的分區數減小到numPartitions個。當數據集通過過濾規模減小時,使用這個操作可以提升性能。
19. repartition(numPartitions)
重組數據,數據被重新隨機分區為numPartitions個,numPartitions可以比原來大,也可以比原來小,平衡各個分區。這一操作會將整個數據集在網絡中重新洗牌。
20. repartitionAndSortWithinPartitions(partitioner)
根據給定的partitioner函數重新將RDD分區,並在分區內排序。這比先repartition然后在分區內sort高效,原因是這樣迫使排序操作被移到了shuffle階段。