本來應該上周更新的,結果碰上五一,懶癌發作,就推遲了 = =。以后還是要按時完成任務。廢話不多說,第四章-第六章主要講了三個內容:鍵值對、數據讀取與保存與Spark的兩個共享特性(累加器和廣播變量)。
鍵值對(PaiRDD)
1.創建
1 #在Python中使用第一個單詞作為鍵創建一個pairRDD,使用map()函數 2 pairs = lines.map(lambda x:(x.split(" ")[0],x))
2.轉化(Transformation)
轉化操作很多,有reduceByKey,foldByKey(),combineByKey()等,與普通RDD中的reduce()、fold()、aggregate()等類似,只不過是根據鍵來進行操作。
reduceByKey():與recude()類似,只不過是根據鍵進行聚合
foldByKey():與fold()類似
combineByKey():與aggregate()類似
1 #用Python對第二個元素進行篩選 2 result = pairs.filter(lambda keyValue:len(keyValue[1]) < 20) 3 4 #在Python中使用reduceByKey()和mapValues()計算每個鍵對應的平均值 5 rdd.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) 6 7 #用Python實現單詞計數 8 rdd.sc.textFile("文件地址") 9 words = rdd.flatMap(lambda x:x.split(" ")) 10 result = words.map(lambda x:(x,1)).reduceByKey((x,y)=>x+y) 11 12 #在Python中使用combineByKey()求每個鍵對應的平均值 13 sumCount = nums.combineByKey((lambda x:(x,1)), 14 (lambda x,y:(x[0]+y,x[1]+1)), 15 (lambda x,y:(x[0]+y[0],x[1]+y[1]))) 16 sumCount.map(lambda key,xy:(key.xy[0]/xy[1])).collectAsMap() 17 18 #在Python中自定義reduceByKey()的並行度 19 data = [("a",3),("b",4),("a",1)] 20 sc.parallelize(data).reduceByKey(lambda x,y:x+y)#默認並行度 21 sc.parallelize(data).reduceByKey(lambda x,y:x+y,10)#自定義並行度 22 23 #在Python中以字符串順序對整數進行自定義排序 24 rdd.sortByKey(ascending = True,numPartitions = None,keyFunc = lambda x: str(x))
3.行動操作(Action)
數據分區:數據比較大時,可以用partitionBy()轉化為哈希分區。即通過向partitionBy傳遞一個spark.HashPartitioner對象來實現該操作。在Python中不能將HashPartitioner對象傳遞給partitionBy,只需要把需要的分區數傳遞過去(如 rdd.partitionBy(100))。
在spark中,會為生成的結果RDD設好分區方式的操作有:cogroup(),groupWith(),join(),leftOuterJoin(),rightOutJoin,groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues(),flatMapValues(),filter()。最后三種只有當父RDD有分區方式時,結果RDD才會有分區RDD。其他的操作生成的結果都不會存在特定的分區方式。
自定義分區方式:
#Python自定義分區方式 import urlparse def hash_domain(url): return hash(urlparse.urlparse(url).netloc) rdd.partitionBy(20,hash_domain) #創建20個分區
數據的讀取與保存
文件格式
格式名稱 | 結構化 | 備注 |
文本文件 |
否 | 普通的文本文件,每行一條記錄 |
JSON |
半結構化 | 常見的基於文本的格式,半結構化;大多數庫要求每行一條記錄 |
CSV |
是 |
常見文本結構 |
SequenceFile |
是 | 一種用於鍵值對數據的常見Hadoop文件格式 |
Protocol buffers |
是 |
一種快讀、節約空間的跨語言格式 |
對象文件 |
是 | 用來將Spark作業中的數據存儲下來以讓共享的代碼讀取。改變類的時候回失效。因為它依賴於Java序列化 |
文本文件
1 #讀取文本文件 2 input=sc.textFile("文件地址") 3 #保存文本文件 4 result.saveAsTextFile(outputFile)
JSON
1 #讀取Jason 2 import json 3 data = input.map(lambda x: json.loads(x)) 4 #保存 5 (data.filter(lambda x : x["lovaPandas"]).map(lambda x:json.dumps(x))).saveAsTextFile(outputF
CSV文件
1 #用textFile讀取csv 2 import csv 3 import StringIO 4 def loadRecord(line): 5 """解析一行csv記錄""" 6 input = StringIO.StringIO(line) 7 reader = csv.DictReader(input,filenames =["name","favouriteAnimal"]) 8 return reader.next() 9 input = sc.textFile(inputFile).map(loadRecord) 10 11 #讀取完整csv 12 def loadRecords(filenameContents): 13 """讀取給定文件中的所有記錄""" 14 input = StringIO.StringIO(filenameContents[1]) 15 reader = csv.DictReader(input,fieldnames = ["name","favouriteAnimal"]) 16 return reader 17 fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords) 18 19 #保存csv 20 def writeRecords(records): 21 """寫出一些csv記錄""" 22 output = StringIO.StringIO() 23 writer = csv.DictReader(output,filenames = ["name","favouriteAnimal"]) 24 for record in records: 25 writer.writerow(record) 26 return [output.getvalue()] 27 pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
SequenceFile
1 #讀取SequenceFile 2 val data = sc.sequenceFile(inFile,"ord.apache.hadoop.io.Text","org.apache.hadoop.io.InWitable")
對象文件
1 #對象文件,用Java序列化寫的,速度慢,保存用saveAsObjectFile(),讀取用 SparkContext中的objectFile()函數接收一個路徑,返回對應的RDD。它無法在Python中使用
Spark SQL中的結構化數據
Apache Hive
1 #Apache Hive 2 #用Python創建HiveContext並查詢數據 3 from pyspark.sql import HiveContext 4 5 hiveCtx =HiveContext(sc) 6 rows = hiveCtx.sql("SELECT name, age FROM users ") 7 firstRow = rows.first() 8 print firstRow.name
JSON數據
1 #JSON數據示例 2 {"user":{"name":"Holden","location":"San Francisco"},"text":"Nice day out today"} 3 {"user":{"name":"Matei","location":"Berkeley"},"text":"Even nicer here :)"} 4 5 #在Python中使用SparkSQL讀取JSON數據 6 tweets = hiveCtx.jsonFile("tweets.json") 7 # UserWarning: jsonFile is deprecated. Use read.json() instead. 8 # warnings.warn("jsonFile is deprecated. Use read.json() instead.") 9 10 tweets.registerTempTable("tweets") 11 results = hiveCtx.sql("SELECT user.name,text FROM tweets")
這章關於sql的命令比較少,關於SQL的其他命令可以看看Spark的官方文檔(PySpark 1.6.1 documentation),講的比較詳細。注意,這是spark 1.6版本,如果你安裝的是1.2版本,1.6的有些命令是用不了的,可以先升級再用。
最后再來講講Spark中兩種類型的共享變量:累加器(accumulator)和廣播變量(broadcast variable)
累加器:對信息進行聚合。常見得一個用法是在調試時對作業執行進行計數。舉個例子:假設我們從文件中讀取呼號列表對應的日志,同時也想知道輸入文件中有多少空行,就可以用到累加器。實例:
1 #一條JSON格式的呼叫日志示例 2 #數據說明:這是無線電操作者的呼叫日志。呼號由國家分配,每個國家有自己呼號號段,所以可以根據呼號查到相對應的國家。有一些呼叫日志中包含操作者的地理位置,用來幫助確定距離 3 {"address":"address here","band":"40m","callSigns":"KK6JLK","city":"SUNNYVALE", 4 "contactlat":"37.384733","contactlong":"-122.032164", 5 "county":"Santa Clara","dxcc":"291","fullname":"MATTHEW McPherrin", 6 "id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}
1 #在Python中累加空行 2 file = sc.textFile(inputFile) 3 #創建Accumulator[int] 並初始化為0 4 blankLines = sc.accumulator(0) 5 6 def extractCallSigns(line): 7 global blankLines #訪問全局變量 8 if (line == ""): 9 blankLines += 1 10 return line.split(" ") 11 12 callSigns = file.flatMap(extractCallSigns) 13 callSigns.saveAsTextFile(outputDir + "/callSigns") 14 print "Blank Lines:%d " % blankLines.value
我們來看看這段程序,首先創建了一個叫做blankLines的Accumulator[Int]對象,然后在輸入中看到空行就+1,執行完轉化操作后就打印出累加器中的值。注意:只有在執行完saveAsTextFile()這個action操作后才能看到正確的計數,flatMap()是transformation操作,是惰性的,這點在上一篇博文已經講過。
但是我們上一篇文章中也提到過reduce()等這樣的操作也是聚合操作,那為什么還有累加器這個東西存在呢?因為RDD本身提供的同步機制粒度太粗,尤其在transformation操作中變量狀態不能同步,而累加器可以對那些與RDD本身的范圍和粒度不一樣的值進行聚合,不過它是一個write-only的變量,無法讀取這個值,只能在驅動程序中使用value方法來讀取累加器的值。
累加器的用法:
- 通過在驅動器中調用SparkContext.accumulator(initialValue)方法,創建出存有初始值的累加器。返回值為org.apache.spark.Accumulator[T]對象,其中T是初始值initialValue的類型。
- Spark閉包里的執行器代碼可以使用累加器的 += 方法(在Java中是add)增加累加器的值。
- 驅動器程序可以調用累加器的Value屬性來訪問累加器的值(在Java中使用value()或setValue())
對於之前的數據,我們可以做進一步計算:
1 #在Python中使用累加器進行錯誤計數 2 #創建用來驗證呼號的累加器 3 validSignCount = sc.accumulator(0) 4 invalidSignCount = sc.accumulator(0) 5 6 def validataSign(sign): 7 global validSignCount,invalidSignCount 8 if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z",sign): 9 validSignCount += 1 10 return True 11 else: 12 invalidSignCount += 1 13 return False 14 15 #對與每個呼號的聯系次數進行計數 16 validSigns = callings.filter(validataSign) 17 contactCount = validSigns.map(lambda sign:(sign,1)).reduceByKey(lambda (x,y):x+y) 18 19 #強制求值計算計數 20 contactCount.count() 21 if validSignCount.value < 0.1 * validSignCount.value: 22 contactCount.saveAsTextFile(outputDir + "/contactCount") 23 else: 24 print "Too many errors: %d in %d" %(invalidSignCount.value,validSignCount.value)
累加器與容錯性:
我們知道Spark是分布式計算,當有些機器執行得比較慢或者出錯的時候,Spark會自動重新執行這些失敗的或比較慢的任務。這樣會導致同一個函數可能對同一個數據運行了多次,簡單的說就是耗內存,降低了計算速度。在這種情況下,累加器怎么處理呢?
對於要在Action操作中使用的累加器,Spark只會把每個任務對累加器的修改應用一次,一般放在foreach()操作中。而對於Transformation操作中的累加器,可能不止更新一次。所以Transformation中的累加器最好只在調試中使用。
廣播變量
廣播變量允許程序員緩存一個只讀的變量在每台機器上面,而不是每個任務保存一份拷貝。利用廣播變量,我們能夠以一種更有效率的方式將一個大數據量輸入集合的副本分配給每個節點。廣播變量通過兩個方面提高數據共享效率:1,集群中每個節點(物理機器)只有一個副本,默認的閉包是每個任務一個副本;2,廣播傳輸是通過BT下載模式實現的,也就是P2P下載,在集群多的情況下,可以極大的提高數據傳輸速率。廣播變量修改后,不會反饋到其他節點。
在Spark中,它會自動的把所有引用到的變量發送到工作節點上,這樣做很方便,但是也很低效:一是默認的任務發射機制是專門為小任務進行優化的,二是在實際過程中可能會在多個並行操作中使用同一個變量,而Spark會分別為每個操作發送這個變量。舉個例子,假設我們通過呼號的前綴查詢國家,用Spark直接實現如下:
1 #在Python中查詢國家 2 #查詢RDD contactCounts中的呼號的對應位置,將呼號前綴讀取為國家前綴來進行查詢 3 signPrefixes = loadCallSignTable() 4 5 def processSignCount(sign_count,signPrefixes): 6 country = lookupCountry(sign_count[0],signPrefixes) 7 count = sign_count[1] 8 return (country,count) 9 10 countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x,y:x+y)))
數據量小的時候可以運行,但是如果這個表很大,signPrefixes的很容易達到MB級別,從主節點為每個任務發送這樣的數組會非常消耗內存,而且如果之后還需要用到signPrefixes這個變量,還需要再向每個節點發送一遍。
如果把signPrefixes變為廣播變量,就可以解決這個問題:
1 #在Python中使用廣播變量來查詢國家 2 #查詢RDD contactCounts中的呼號的對應位置,將呼號前綴讀取為國家前綴來進行查詢 3 signPrefixes = sc.broadcast(loadCallSignTable()) 4 5 6 def processSignCount(sign_count,signPrefixes): 7 country = lookupCountry(sign_count[0],signPrefixes.value) 8 count = sign_count[1] 9 return (country,count) 10 11 countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x,y:x+y))) 12 13 countryContactCounts.saveAsTextFile(outputDir +"/contries.txt")
總結一下廣播變量的過程:
- 通過對一個類型T的對象調用SparkContext.broadcast創建一個Broadcast[T]對象。任何可序列化的對象都可以這么實現。
- 通過value屬性訪問該對象的值
- 變量只會發到各個節點一次,應作為只讀值處理(修改這個值不會影響到別的節點)。
廣播的優化
如果廣播的值比較大,可以選擇既快又好的序列化格式。Scala和Java API中默認使用Java序列化庫,對於除基本類型的數組以外的任何對象都比較低效。我們可以使用spark.serializer屬性選擇另一個序列化庫來優化序列化過程。(也可以使用reduce()方法為Python的pickle庫自定義序列化)
基於分區進行操作
兩個函數:map() 和 foreach()
函數名 | 調用所提供的 | 返回的 | 對於RDD[T]的函數簽名 |
mapPartitions() | 該分區中元素的迭代器 | 返回的元素的迭代器 | f:(Iterator[T])->Iterator[U] |
mapPartitionsWithIndex() | 分區序號,以及每個分區中的元素的迭代器 | 返回的元素的迭代器 | f:(Int,Iterator[T])->Iterator[U] |
foreachPartitions() | 元素迭代器 | 無 | f:(Iterator[T])->Unit |
示例:我們有一個在線的電台呼號數據,可以通過這個數據庫查詢日志中記錄過的聯系人呼號列表。
1 #在Python中使用共享連接池 2 def processCallSigns(signs): 3 """使用連接池查詢呼號""" 4 #創建一個連接池 5 http = urllib3.PoolManager() 6 #與每條呼號記錄相關的URL 7 urls = map(lambda x: "http://73s.com/qsos/%s.json" % x,signs) 8 #創建請求(非阻塞) 9 requests = map(lambda x:(x,http.request('GET',x)),urls) 10 #獲取結果 11 result = map(lambda x:(x[0],json.loads(x[1].data)),requests) 12 #刪除空的結果並返回 13 return filter(lambda x:x[1] is not None,result) 14 15 def fetchCallSigns(input): 16 """獲取呼號""" 17 return input.mapPartitions(lambda callsigns:processCallSigns(callsigns)) 18 19 contactsCountList = fetchCallSigns(validSigns)
再舉個例子說明一下mapPartitions()的功能:
1 #在Python中不實用mapPartitions()求平均值 2 def combineCtrs(c1,c2): 3 return (c1[0]+c2[0],c1[1]+c2[1]) 4 5 def basicAvg(nums): 6 """計算平均值""" 7 nums.map(lambda num:(num,1)).reduce(combineCtrs) 8 9 10 11 #在Python中使用mapPartitions()求平均值 12 def partitionCtr(nums): 13 """計算分區的sumCounter""" 14 sumCount = [0,0] 15 for num in nums: 16 sumCount[0] +=num 17 sumCount[1] +=1 18 return [sumCount] 19 20 def fastAvg(nums): 21 """計算平均值""" 22 sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs) 23 return sumCount[0]/float(sumCount[1])
數值RDD的操作
方法 | 含義 |
count() |
RDD中的元素個數 |
mean() |
元素的平均值 |
sum() |
總和 |
max() |
最大值 |
min() |
最小值 |
variance() |
元素的方差 |
sampleVariance() |
采樣的方差 |
stdev() |
標准差 |
sampleStdev() |
采樣的標准差 |
舉例:從呼叫日志中移除距離過遠的聯系點
1 #用Python移除異常值 2 #要把String類型的RDD轉化為數字數據,這樣才能使用統計函數並移除異常值 3 distanceNumerics = distances.map(lambda string :float(string)) 4 stats = distanceNumerics.stats() 5 stddev = stdts.stdev() 6 mean =stats.mean() 7 reasonableDistances = distanceNumerics.filter(lambda x:math.fabs(x-mean) < 3 * stddev) 8 print reasonableDistances.collect()
這三章的內容比較實用,在生產中也會有實際應用。下周更新第7-9章,主要講Spark在集群上的運行、Spark調優與調試和Spark SQL。