問題描述:
將來數據量可能很大,所以ip規則肯定是存儲在HDFS中的,這樣在讀取的時候根據切片數量,會啟動相應的Task,但是數據切片中就可能不會包含所有的ip規則,然后你處理的log文件獲取的ip就找不到對應的省份了。這樣就出現了問題。所以現在需要每個Task都會獲取到全部的ip規則。但是ip規則的數據是分片存放的,怎樣讓Task獲取到全部的ip規則尼?這時就需要將每個切片的IP規則拉取到Spark Submit(Driver)端,然后再通過廣播變量的形式下發到每個Executor中,每個Executor都會持有一份完整的ip規則,這樣Task在處理log文件數據的時候,就可以拉取Executor中的IP規則了。
廣播變量的好處
廣播變量的好處,不是每個task一份變量副本,而是變成每個節點的executor才一份副本。這樣的話,
就可以讓變量產生的副本大大減少。
廣播變量的用法
廣播變量,很簡單
其實就是SparkContext的broadcast()方法,傳入你要廣播的變量,即可
final Broadcast<Map<String, Map<String, IntList>>> dateHourExtractMapBroadcast = sc.broadcast(fastutilDateHourExtractMap);
使用廣播變量的時候
直接調用廣播變量(Broadcast類型)的value() / getValue()
可以獲取到之前封裝的廣播變量
廣播變量,初始的時候,就在Drvier上有一份副本。
task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中,
嘗試獲取變量副本;如果本地沒有,那么就從Driver遠程拉取變量副本,並保存在本地的BlockManager中;
此后這個executor上的task,都會直接使用本地的BlockManager中的副本。
executor的BlockManager除了從driver上拉取,也可能從其他節點的BlockManager上拉取變量副本。
HttpBroadcast TorrentBroadcast(默認)
BlockManager
負責管理某個Executor對應的內存和磁盤上的數據,嘗試在本地BlockManager中找map
舉例來說
50個executor,1000個task。一個map,10M。
默認情況下,1000個task,1000份副本。10G的數據,網絡傳輸,在集群中,耗費10G的內存資源。
如果使用了廣播變量。50個execurtor,50個副本。500M的數據,網絡傳輸,
而且不一定都是從Driver傳輸到每個節點,還可能是就近從最近的節點的executor的bockmanager
上拉取變量副本,網絡傳輸速度大大增加;500M的內存消耗。
10000M,500M,20倍。20倍~以上的網絡傳輸性能消耗的降低;20倍的內存消耗的減少。
對性能的提升和影響,還是很客觀的。
雖然說,不一定會對性能產生決定性的作用。比如運行30分鍾的spark作業,可能做了廣播變量以后,
速度快了2分鍾,或者5分鍾。但是一點一滴的調優,積少成多。最后還是會有效果的。
沒有經過任何調優手段的spark作業,16個小時;三板斧下來,就可以到5個小時;
然后非常重要的一個調優,影響特別大,shuffle調優,2~3個小時;應用了10個以上的性能調優的技術點
,JVM+廣播,30分鍾。16小時~30分鍾。
那最后我們做一下,怎么做?就是把dateHourExtractMap做成廣播變量Broadcast