Flink支持廣播變量,就是將數據廣播到具體的taskmanager上,數據存儲在內存中,這樣可以減緩大量的shuffle操作;
比如在數據join階段,不可避免的就是大量的shuffle操作,我們可以把其中一個dataSet廣播出去,一直加載到taskManager的內存中,可以直接在內存中拿數據,避免了大量的shuffle,導致集群性能下降;
注意:因為廣播變量是要把dataset廣播到內存中,所以廣播的數據量不能太大,否則會出現OOM這樣的問題
Broadcast:Broadcast是通過withBroadcastSet(dataset,string)來注冊的
Access:通過getRuntimeContext().getBroadcastVariable(String)訪問廣播變量

/** * Created by angel; */ object BrodCast { def main(args: Array[String]): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment //TODO data2 join data3的數據,使用廣播變量完成 val data2 = new mutable.MutableList[(Int, Long, String)] data2.+=((1, 1L, "Hi")) data2.+=((2, 2L, "Hello")) data2.+=((3, 2L, "Hello world")) val ds1 = env.fromCollection(Random.shuffle(data2)) val data3 = new mutable.MutableList[(Int, Long, Int, String, Long)] data3.+=((1, 1L, 0, "Hallo", 1L)) data3.+=((2, 2L, 1, "Hallo Welt", 2L)) data3.+=((2, 3L, 2, "Hallo Welt wie", 1L)) val ds2 = env.fromCollection(Random.shuffle(data3)) //todo 使用內部類RichMapFunction,提供open和map,可以完成join的操作 val result = ds1.map(new RichMapFunction[(Int , Long , String) , ArrayBuffer[(Int , Long , String , String)]] { var brodCast:mutable.Buffer[(Int, Long, Int, String, Long)] = null override def open(parameters: Configuration): Unit = { import scala.collection.JavaConverters._ //asScala需要使用隱式轉換 brodCast = this.getRuntimeContext.getBroadcastVariable[(Int, Long, Int, String, Long)]("ds2").asScala } override def map(value: (Int, Long, String)):ArrayBuffer[(Int , Long , String , String)] = { val toArray: Array[(Int, Long, Int, String, Long)] = brodCast.toArray val array = new mutable.ArrayBuffer[(Int , Long , String , String)] var index = 0 var a:(Int, Long, String, String) = null while(index < toArray.size){ if(value._2 == toArray(index)._5){ a = (value._1 , value._2 , value._3 , toArray(index)._4) array += a } index = index + 1 } array } }).withBroadcastSet(ds2 , "ds2") println(result.collect()) } }