spark基礎知識請參考spark官網:http://spark.apache.org/docs/1.2.1/quick-start.html
無論是mapreduce還是spark ,分布式框架的性能優化方向大致分為:負載均衡、網絡傳輸和磁盤I/O 這三塊。而spark是基於內存的計算框架,因此在編寫應用時需要充分利用其內存計算特征。本篇主要針對
spark應用中的join問題進行討論,關於集群參數的優化會在另一篇文章中提及。
在傳統的數據庫平台和分布式計算平台,join的性能消耗都是很可觀的,對spark來說如果join的表比較大,那么在shuffle時網絡及磁盤壓力會明顯提升,嚴重時可能會造成excutor失敗導致任務無法進行下去,
對這種join的優化方法主要是采用map和filter來改變join的實現方式,減少shuffle階段的網絡和磁盤I/O。下面以表的數據量大小分兩部分來討論。
大表:數據量較大的表
小表:數據量較小的表
一、大表與小表之間的join
這種join是大部分業務場景的主要join方式,將小表以broadcast的形式分發到每個executor后對大表進行filter操作,以下對每種join進行示例說明(兼容表中ID不唯一的情況)。
1、leftOuterJoin
>>>d1=sc.parallelize([(1,2),(2,3),(2,4),(3,4)])
>>>d2=sc.parallelize([(1,'a'),(2,'b'),(1,'d'),(5,'2')])
原生實現方式:
>>>d1.leftOuterJoin(d2).collect()
>>>[(1, (2, 'a')), (1, (2, 'd')), (2, (4, 'b')), (2, (3, 'b')), (3, (4, None))]
map實現方式(小表在右的實現方式,小表在左的情況會稍微復雜些,需要多一些操作操作,實際場景中不多見):
def doJoin(row): result=[] if row[1][1] is not None: for i in row[1][1]: result+=[(row[0],(row[1][0],i))] else: result+=[row] return result d2_map={} for i in d2.groupByKey().collect(): d2_map[i[0]]=i[1] d2_broadcast=sc.broadcast(d2_map) d2_dict=d2_broadcast.value d1.map(lambda row:(row[0],(row[1],d2_dict.get(row[0])))).flatMap(doJoin).collect()
>>>[(1, (2, 'd')), (1, (2, 'a')), (2, (3, 'b')), (2, (4, 'b')), (3, (4, None))]
2、join
這里的join指的是innerjoin即只取出匹配到的數據項,只需要在上面的實現方式中加個filter即可
d1.map(lambda row:(row[0],(row[1],d2_dict.get(row[0])))).filter(lambda row:row[1][1] is not None).flatMap(doJoin).collect()
>>>[(1, (2, 'd')), (1, (2, 'a')), (2, (3, 'b')), (2, (4, 'b'))]
二、大表與大表之間的join(Reduce-join)
大表之間的join無法通過緩存數據來達到優化目的,因此需要把優化的重點放在分區效率及key的設計上
1、join的key值盡量使用數值類型,減少分區及shuffle的操作時間,在join時數值類型的key值在匹配時更快
2、將過濾條件放在join之前,使得join的數據量盡量最少
3、在join之前將兩個表按相同分區數進行重新分區
reduce-join:指將兩個表按key值進行分區,相同key的數據會被分在同一個分區,最后使用mapPartition進行join操作。
4、如果需要減少分區和並行度,請使用coalesce 而非repartition 方法。
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
三、其它優化方式
1、同一份數據被多次用到,在讀入時進行緩存,后面直接使用,例如配置表,如果數據量不大則進行broadcast,否則使用cache
2、盡量減少重復計算,同樣的計算邏輯只計算一次
3、幾個優化參數
spark.akka.frameSize 1000 集群間通信 一幀數據的大小,設置太小可能會導致通信延遲
spark.akka.timeout 100 通信等待最長時間(秒為單位)
spark.akka.heartbeat.pauses 600 心跳失敗最大間隔(秒為單位)
spark.serializer org.apache.spark.serializer.KryoSerializer 序列化方式(sprak自己的實現方式)
spark.sql.autoBroadcastJoinThreshold -1 禁止自動broadcast表
spark.shuffle.consolidateFiles true shuffle 自動合並小文件
四、后續優化方向
1、內存優化:對象所占用的內存,訪問對象的消耗以及垃圾回收(garbage collection)所占用的開銷
2、優化數據結構
3、優化RDD存儲
4、並行度