初識Flink廣播變量broadcast


  Broadcast 廣播變量:可以理解為是一個公共的共享變量,我們可以把一個dataset 或者不變的緩存對象(例如map list集合對象等)數據集廣播出去,然后不同的任務在節點上都能夠獲取到,並在每個節點上只會存在一份,而不是在每個並發線程中存在。如果不使用broadcast,則在每個節點中的每個任務中都需要拷貝一份dataset數據集,比較浪費內存(也就是一個節點中可能會存在多份dataset數據)。

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.collection.mutable.ListBuffer

object BatchDemoBroadcastScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    //1: 准備需要廣播的數據
    val broadData = ListBuffer[Tuple2[String,Int]]()
    broadData.append(("zs",18))
    broadData.append(("ls",20))
    broadData.append(("ww",17))
    //1.1處理需要廣播的數據
    val tupleData = env.fromCollection(broadData)
    val toBroadcastData = tupleData.map(tup=>{
      Map(tup._1->tup._2)
    })

    val text = env.fromElements("zs","ls","ww")

    val result = text.map(new RichMapFunction[String,String] {

      var listData: java.util.List[Map[String,Int]] = null
      var allMap  = Map[String,Int]()

      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName")
        val it = listData.iterator()
        while (it.hasNext){
          val next = it.next()
          allMap = allMap.++(next)
        }
      }

      override def map(value: String) = {
        val age = allMap.get(value).get
        value+","+age
      }
    }).withBroadcastSet(toBroadcastData,"broadcastMapName")

    result.print()
  }
}

1、設置廣播變量
  在某個需要用到該廣播變量的算子后調用withBroadcastSet(var1, var2)進行設置,var1為需要廣播變量的變量名,var2是自定義變量名,為String類型。注意,被廣播的變量只能為DataSet類型,不能為List、Int、String等類型。
2、

獲取廣播變量
創建該算子對應的富函數類,例如map函數的富函數類是RichMapFunction,該類有兩個構造參數,第一個參數為算子輸入數據類型,第二個參數為算子輸出數據類型。首先創建一個Traversable[_]接口用於接收廣播變量並初始化為空,接收類型與算子輸入數據類型相對應;然后重寫open函數,通過getRuntimeContext.getBroadcastVariable[_](var)獲取到廣播變量,var即為設置廣播變量時的自定義變量名,類型為String,open函數在算子生命周期的初始化階段便會調用;最后在map方法中對獲取到的廣播變量進行訪問及其它操作。

 

參考:

https://blog.csdn.net/fct2001140269/article/details/84402798

https://blog.csdn.net/qq_34842671/article/details/80746593


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM