spark廣播變量定時更新


廣播變量

先來簡單介紹下spark中的廣播變量:

廣播變量允許開發者緩存一個只讀的變量在每台機器上面,而不是每個任務保存一份拷貝。例如,利用廣播變量,我們能夠以一種更有效率的方式將一個大數據量輸入集合的副本分配給每個節點。Spark也嘗試着利用有效的廣播算法去分配廣播變量,以減少通信的成本。

一個廣播變量可以通過調用SparkContext.broadcast(v)方法從一個初始變量v中創建。廣播變量是v的一個包裝變量,它的值可以通過value方法訪問,下面的代碼說明了這個過程:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

從上文我們可以看出廣播變量的聲明很簡單,調用broadcast就能搞定,並且scala中一切可序列化的對象都是可以進行廣播的,這就給了我們很大的想象空間,可以利用廣播變量將一些經常訪問的大變量進行廣播,而不是每個任務保存一份,這樣可以減少資源上的浪費。

更新廣播變量(rebroadcast)

廣播變量可以用來更新一些大的配置變量,比如數據庫中的一張表格,那么有這樣一個問題,如果數據庫當中的配置表格進行了更新,我們需要重新廣播變量該怎么做呢。上文對廣播變量的說明中,我們知道廣播變量是只讀的,也就是說廣播出去的變量沒法再修改,那么我們應該怎么解決這個問題呢?
答案是利用spark中的unpersist函數

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

上文是從spark官方文檔摘抄出來的,我們可以看出,正常來說每個節點的數據是不需要我們操心的,spark會自動按照LRU規則將老數據刪除,如果需要手動刪除可以調用unpersist函數。

那么更新廣播變量的基本思路:將老的廣播變量刪除(unpersist),然后重新廣播一遍新的廣播變量。

public class BroadcastStringPeriodicUpdater {
  private static final int PERIOD = 60 * 1000;
  private static volatile BroadcastStringPeriodicUpdater instance;

  private Broadcast<String> broadcast;
  private long lastUpdate = 0L;

  private BroadcastStringPeriodicUpdater() {}

  public static BroadcastStringPeriodicUpdater getInstance() {
    if (instance == null) {
      synchronized (BroadcastStringPeriodicUpdater.class) {
        if (instance == null) {
          instance = new BroadcastStringPeriodicUpdater();
        }
      }
    }
    return instance;
  }

  public String updateAndGet(SparkContext sc) {
    long now = System.currentTimeMillis();
    long offset = now - lastUpdate;
    if (offset > PERIOD || broadcast == null) {
      if (broadcast != null) {
        broadcast.unpersist();
      }
      lastUpdate = now;
      String value = fetchBroadcastValue();
      broadcast = JavaSparkContext.fromSparkContext(sc).broadcast(value);
    }
    return broadcast.getValue();
  }

  private String fetchBroadcastValue() {

  }
}

用的時候就可以這樣用

String broadcastValue = BroadcastStringPeriodicUpdater.getInstance().updateAndGet(rdd.context());

 

總結

spark中的共享變量是我們能夠在全局做出一些操作,比如record總數的統計更新,一些大變量配置項的廣播等等。而對於廣播變量,我們也可以監控數據庫中的變化,做到定時的重新廣播新的數據表配置情況

 

 

參考:https://www.qcloud.com/community/article/407582

參考:https://mp.weixin.qq.com/s?__biz=MzU3MzgwNTU2Mg==&mid=2247486644&idx=1&sn=d2637a1e918c2b1be4c9fe3d74f75a92&chksm=fd3d4a21ca4ac3377cc8836939cc041cf934bb57f73b6b618fd1de608495d86e278c1c7e4cdc&token=1999457569&lang=zh_CN#rd


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM