一、count
sql = """select video_id,count(video_id) as video_num from video_table group by video_id order by video_num desc"""
rdd = spark.sql(sql).rdd.map(lambda x: x["video_id"])
result = rdd.collect()
二、sortBy和sortByKey
from operator import add
sql = """select video_id from video_table """
rdd = spark.sql(sql).rdd.map(lambda x: (x["video_id"],1))..reduceByKey(add)
rdd1 = rdd.sortBy(lambda x: x[1], ascending=False)
rdd2 = rdd.sortByKey(lambda x: x, ascending=False)
result = rdd1.collect() # rdd2.collect()
1、sortBy如何實現全局排序
sortBy實際上調用sortByKey
def sortBy(self, keyfunc, ascending=True, numPartitions=None):
return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
2、sortBy的實現過程:
Stage 0:Sample。創建 RangePartitioner,先對輸入的數據的key做sampling來估算key的分布情況,然后按指定的排序切分出range,盡可能讓每個partition對應的range里的key的數量均勻。計算出來的 rangeBounds 是一個長為 numPartitions - 1 的list,記錄頭 numPartitions - 1 個partition對應的range的上界;最后一個partition的邊界就隱含在“剩余”當中。
rddSize = self.count() # 統計rdd中包含的元素個數,假設rddSize=10000
if not rddSize:
return self # empty RDD
maxSampleSize = numPartitions * 20.0 # 假設有4個分區,maxSampleSize=80
fraction = min(maxSampleSize / max(rddSize, 1), 1.0) # fraction=8/1000,
samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect() # 采樣 8/1000,根據采樣出的數據來估算key的分布情況。
samples = sorted(samples, key=keyfunc) # 對采樣得到的rdd collect之后得到的列表,調用python的sorted方法,完成從小到大排序,得到排好序的列表。
# 得到numPartition-1=3個邊界列表。
bounds = [samples[int(len(samples) * (i + 1) / numPartitions)]
for i in range(0, numPartitions - 1)]
# partitionBy根據給定的3個邊界進行分區,分區之后分區間的元素是排好序的。再調用mapPartitions,對每個分區的數據進行排序
def rangePartitioner(k):
p = bisect.bisect_left(bounds, keyfunc(k))
if ascending:
return p
else:
return numPartitions - 1 - p
return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)
Stage 1:Shuffle Write。開始shuffle,在map side做shuffle write,根據前面計算出的rangeBounds來重新partition。
通過key值和區間邊界進行比較,如果位於改區間,則分配到該區間對應的分區。
Shuffle write出的數據中,每個partition內的數據雖然尚未排序,但partition之間已經可以保證數據是按照partition index排序的了。
Stage 2:Shuffle Read。然后到reduce side,每個reducer再對拿到的本partition內的數據做排序。這樣完成之后,partition之間的數據在map side就保證有排序,而每個partition內的數據在reduce side也保證有排序,就達到了全局排序的效果。如果在 sortByKey() 后面跟一個 collect() 調用,則它會按照partition index的順序獲取結果數據,最后再把這些數組合並起來,就在本地得到了全局排序后的大數組。
三、調用python方法
from collections import Counter
def category_trans(x):
"""
統計每個分類下面視頻出現的次數
:param x:
:return:
"""
tag = x[0]
videos = x[1]
result = Counter(videos)
r = sorted(result.items(), key=lambda item: item[1],reverse=True)
return tag,[item[0] for item in r]
rdd = rdd.map(lambda x:(x["tag"],[x["video_id"]])) # 此時rdd內的數據為[(tag1:[video_1]),(tag1,[video_2]),(tag2,[video_1]),...]
video_rdd = rdd.reduceByKey(lambda x,y: x+y) # [(tag1,[v1,v2,...]),...]
t2v = video_rdd.map(lambda x: category_trans(x)) # [(tag1,[排好序的列表]),...]
result = t2v.collectAsMap()
四、自定義類
將rdd中元素轉換為自定義類的實例
class MySort():
"""
自定義類的__lt__()方法。python的類中已經自帶了lt,eq,ge,gt...等方法
"""
def __init__(self,num):
self.num = num
def __lt__(self,other):
return self.num<other.num
def __repr__(self):
return str(self.num)
rdd = sc.parallelize([(1, 1),(1, 2), (-1, 1), (-1, -0.5)])
rdd = rdd.map(MySort)
rdd = rdd.sortBy(lambda x: x,ascending=False)
rdd = rdd.foreach(lambda x: print(x))
參考資料