Broadcast 廣播變量:可以理解為是一個公共的共享變量,我們可以把一個dataset 或者不變的緩存對象(例如map list集合對象等)數據集廣播出去,然后不同的任務在節點上都能夠獲取到,並在每個節點上只會存在一份,而不是在每個並發線程中存在。如果不使用broadcast,則在每個節點中的每個任務中都需要拷貝一份dataset數據集,比較浪費內存(也就是一個節點中可能會存在多份dataset數據)。
import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration import scala.collection.mutable.ListBuffer object BatchDemoBroadcastScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //1: 准備需要廣播的數據 val broadData = ListBuffer[Tuple2[String,Int]]() broadData.append(("zs",18)) broadData.append(("ls",20)) broadData.append(("ww",17)) //1.1處理需要廣播的數據 val tupleData = env.fromCollection(broadData) val toBroadcastData = tupleData.map(tup=>{ Map(tup._1->tup._2) }) val text = env.fromElements("zs","ls","ww") val result = text.map(new RichMapFunction[String,String] { var listData: java.util.List[Map[String,Int]] = null var allMap = Map[String,Int]() override def open(parameters: Configuration): Unit = { super.open(parameters) this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName") val it = listData.iterator() while (it.hasNext){ val next = it.next() allMap = allMap.++(next) } } override def map(value: String) = { val age = allMap.get(value).get value+","+age } }).withBroadcastSet(toBroadcastData,"broadcastMapName") result.print() } }
1、設置廣播變量
在某個需要用到該廣播變量的算子后調用withBroadcastSet(var1, var2)進行設置,var1為需要廣播變量的變量名,var2是自定義變量名,為String類型。注意,被廣播的變量只能為DataSet類型,不能為List、Int、String等類型。
2、
獲取廣播變量
創建該算子對應的富函數類,例如map函數的富函數類是RichMapFunction,該類有兩個構造參數,第一個參數為算子輸入數據類型,第二個參數為算子輸出數據類型。首先創建一個Traversable[_]接口用於接收廣播變量並初始化為空,接收類型與算子輸入數據類型相對應;然后重寫open函數,通過getRuntimeContext.getBroadcastVariable[_](var)獲取到廣播變量,var即為設置廣播變量時的自定義變量名,類型為String,open函數在算子生命周期的初始化階段便會調用;最后在map方法中對獲取到的廣播變量進行訪問及其它操作。
參考:
https://blog.csdn.net/fct2001140269/article/details/84402798
https://blog.csdn.net/qq_34842671/article/details/80746593
