一、 廣播變量
廣播變量允許程序員將一個只讀的變量緩存在每台機器上,而不用在任務之間傳遞變量。廣播變量可被用於有效地給每個節點一個大輸入數據集的副本。Spark還嘗試使用高效地廣播算法來分發變量,進而減少通信的開銷。 Spark的動作通過一系列的步驟執行,這些步驟由分布式的洗牌操作分開。Spark自動地廣播每個步驟每個任務需要的通用數據。這些廣播數據被序列化地緩存,在運行任務之前被反序列化出來。這意味着當我們需要在多個階段的任務之間使用相同的數據,或者以反序列化形式緩存數據是十分重要的時候,顯式地創建廣播變量才有用。
二、為什么使用廣播變量
假如我們要共享的變量map,1M
在默認的,task執行的算子中,使用了外部的變量,每個task都會獲取一份變量的副本,
在什么情況下,會出現性能上的惡劣的影響呢?
1000個task。大量task的確都在並行運行。這些task里面都用到了占用1M內存的map,那么首先,map會拷貝1000份副本,通過網絡傳輸到各個task中去,給task使用。總計有1G的數據,會通過網絡傳輸。網絡傳輸的開銷,不容樂觀啊!!!網絡傳輸,也許就會消耗掉你的spark作業運行的總時間的一小部分。
map副本,傳輸到了各個task上之后,是要占用內存的。1個map的確不大,1M;1000個map分布在你的集群中,一下子就耗費掉1G的內存。對性能會有什么影響呢?不必要的內存的消耗和占用,就導致了,你在進行RDD持久化到內存,也許就沒法完全在內存中放下;就只能寫入磁盤,最后導致后續的操作在磁盤IO上消耗性能;
你的task在創建對象的時候,也許會發現堆內存放不下所有對象,也許就會導致頻繁的垃圾回收器的回收,GC。GC的時候,一定是會導致工作線程停止,也就是導致Spark暫停工作那么一點時間。頻繁GC的話,對Spark作業的運行的速度會有相當可觀的影響。
map副本,傳輸到了各個task上之后,是要占用內存的。1個map的確不大,1M;1000個map分布在你的集群中,一下子就耗費掉1G的內存。對性能會有什么影響呢?不必要的內存的消耗和占用,就導致了,你在進行RDD持久化到內存,也許就沒法完全在內存中放下;就只能寫入磁盤,最后導致后續的操作在磁盤IO上消耗性能;
你的task在創建對象的時候,也許會發現堆內存放不下所有對象,也許就會導致頻繁的垃圾回收器的回收,GC。GC的時候,一定是會導致工作線程停止,也就是導致Spark暫停工作那么一點時間。頻繁GC的話,對Spark作業的運行的速度會有相當可觀的影響。
如果說,task使用大變量(1m~100m),明知道會導致性能出現惡劣的影響。那么我們怎么來解決呢?
廣播,Broadcast,將大變量廣播出去。而不是直接使用。
廣播變量的好處,不是每個task一份變量副本,而是變成每個節點的executor才一份副本。這樣的話,就可以讓變量產生的副本大大減少。
廣播變量,初始的時候,就在Drvier上有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的
BlockManager中,嘗試獲取變量副本;如果本地沒有,BlockManager,也許會從遠程的Driver上面去獲取變量副本;也有可能從距離比較近的其他
節點的Executor的BlockManager上去獲取,並保存在本地的BlockManager中;BlockManager負責管理某個Executor對應的內存和磁盤上的數據,
此后這個executor上的task,都會直接使用本地的BlockManager中的副本。
優點:
不是每個task一份副本,而是變成每個節點Executor上一個副本。
1.舉例來說:
50個Executor 1000個task。
一個map10M
默認情況下,1000個task 1000個副本
1000 * 10M = 10 000M = 10 G
10G的數據,網絡傳輸,在集群中,耗費10G的內存資源。
如果使用 廣播變量,
50個Executor ,50個副本,10M*50 = 500M的數據。
網絡傳輸,而且不一定是從Drver傳輸到各個節點,還可能是從就近的節點
的Executor的BlockManager上獲取變量副本,網絡傳輸速度大大增加。
之前 10000M 現在 500M。
20倍網絡傳輸性能的消耗。20倍內存消耗的減少。
三、如何使用
開始使用broadcast變量,使用完后,程序結束記得釋放
sc = SparkContext(appName=AppNames.JOURNEY_AGGREGATOR_APP_NAME) broadCastForLog = None try: broadCastForLog = ELogForDistributedApp.setLogConf2BroadCast(sc) elogging.initLogFromDict(broadCastForLog.value) except StandardError: pass ....... #執行完程序邏輯,記得釋放該變量 if broadCastForLog is not None: broadCastForLog.unpersist(False)
#獲取要被共享的大變量,這里是log配置
class ELogForDistributedApp(object): LOGHDFSPATH = "/user/hdfs/test/logging/logging_hdfs.json" @staticmethod def setLogConf2BroadCast(sc): logFilePath = ELogForDistributedApp.LOGHDFSPATH if sc is not None: configDict = HDFSOperation.getConfigFromHDFS(logFilePath,sc) broadCast = sc.broadcast(configDict) #globals()['broadCast'] = broadCast #elogging.initLogFromDict(broadCast.value) return broadCast #print broadCast.value else: return None
def initLogFromDict(self):
elogging.initLogFromDict(self.eloggingConfig)
從hdfs中找到相應配置文件
class HDFSOperation(object): @staticmethod def getConfigFromHDFS(hdfsPath,sc): if sc is not None: filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem hadoop_configuration = sc._jsc.hadoopConfiguration() fs =filesystem_class.get(hadoop_configuration) path_class = sc._gateway.jvm.org.apache.hadoop.fs.Path pathObj = path_class(hdfsPath) try: hdfsInStream = fs.open(pathObj) bufferedReader_class = sc._gateway.jvm.java.io.BufferedReader inputStreamReader_class = sc._gateway.jvm.java.io.InputStreamReader bufferedReader = bufferedReader_class(inputStreamReader_class(hdfsInStream)) except IOError,msg: print str(msg) return None else: return None configStr = '' while True: tmpStr = bufferedReader.readLine() if tmpStr == None: break configStr += tmpStr try: confDict = json.loads(configStr) except IOError,msg: print str(msg) return None return confDict
