Flink 的廣播變量


Flink 支持廣播變量,就是將數據廣播到具體的 taskmanager 上,數據存儲在內存中,這樣可以減緩大量的 shuffle 操作;

比如在數據 join 階段,不可避免的就是大量的 shuffle 操作,我們可以把其中一個 dataSet 廣播出去,一直加載到 taskManager 的內存中,可以直接在內存中拿數據,避免了大量的 shuffle,導致集群性能下降;

廣播變量創建后,它可以運行在集群中的任何 function 上,而不需要多次傳遞給集群節點。另外需要記住,不應該修改廣播變量,這樣才能確保每個節

點獲取到的值都是一致的。

一句話解釋,可以理解為是一個公共的共享變量,我們可以把一個 dataset數據集廣播出去,然后不同的 task 在節點上都能夠獲取到,這個數據在每個節

點上只會存在一份。如果不使用 broadcast,則在每個節點中的每個 task 中都需要拷貝一份 dataset 數據集,比較浪費內存(也就是一個節點中可能會存在多份dataset 數據)。

注意:因為廣播變量是要把 dataset 廣播到內存中,所以廣播的數據量不能太大,否則會出現 OOM 這樣的問題

  • Broadcast:Broadcast 是通過 withBroadcastSet(dataset,string)來注冊的
  • Access:通過 getRuntimeContext().getBroadcastVariable(String)訪問廣播變量

   

操作步驟

1:初始化數據

DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)

2:廣播數據

.withBroadcastSet(toBroadcast, "broadcastSetName");

3:獲取數據

Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");

   

  

package com.starzy

   

import org.apache.flink.api.common.functions.RichMapFunction

import org.apache.flink.api.scala.ExecutionEnvironment

import org.apache.flink.configuration.Configuration

import org.apache.flink.api.scala._

import scala.collection.mutable

import scala.collection.mutable.ArrayBuffer

import scala.util.Random

   

object BrodCast {

def main(args: Array[String]): Unit = {

val env: ExecutionEnvironment = ExecutionEnvironment. getExecutionEnvironment

   

//TODO data2 join data3 的數據,使用廣播變量完成 的數據,使用廣播變量完成

val data2 = new mutable.MutableList[(Int, Long, String)]

data2.+=((1, 1L, "Hi"))

data2.+=((2, 2L, "Hello"))

data2.+=((3, 2L, "Hello world"))

val ds1 = env.fromCollection(Random.shuffle(data2))

val data3 = new mutable.MutableList[(Int, Long, Int, String, Long)]

data3.+=((1, 1L, 0, "Hallo", 1L))

data3.+=((2, 2L, 1, "Hallo Welt", 2L))

data3.+=((2, 3L, 2, "Hallo Welt wie", 1L))

val ds2 = env.fromCollection(Random.shuffle(data3))

   

   

//todo 使用內部類 RichMapFunction ,提供 open map ,可以完成 join 的操作 的操作

val result = ds1.map(new RichMapFunction[(Int , Long , String) , ArrayBuffer[(Int , Long , String , String)]] {

   

var brodCast :mutable.Buffer[(Int, Long, Int, String, Long)] = null

override def open(parameters: Configuration): Unit = {

import scala.collection.JavaConverters._

//asScala 需要使用隱式轉換

brodCast = this.getRuntimeContext.getBroadcastVariable[(Int, Long, Int, String, Long)]("ds2").asScala

}

override def map(value: (Int, Long, String)):ArrayBuffer[(Int , Long , String , String)] = {

val toArray: Array[(Int, Long, Int, String, Long)] = brodCast .toArray

val array = new mutable.ArrayBuffer[(Int , Long , String , String)]

var index = 0

var a:(Int, Long, String, String) = null

while(index < toArray.size){

if(value._2 == toArray(index)._5){

a = (value._1 , value._2 , value._3 , toArray(index)._4)

array += a

}

index = index + 1

}

array

}

}).withBroadcastSet(ds2 , "ds2")

println (result.collect())

}

}

   

   

   

   

   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM