spark 廣播變量


Spark廣播變量

使用廣播變量來優化,廣播變量的原理是:

在每一個Executor中保存一份全局變量,task在執行的時候需要使用和這一份變量就可以,極大的減少了Executor的內存開銷。

Executor中task在執行的時候如果使用到了廣播變量,會找Executor里面的BlockManager來獲取廣播變量。

如果BlockManager中沒有這個關閉變量,會從driver端拉取關閉變量。

在Driver端也有一個blockManagerMaster,其他的task執行的時候直接使用blockmanager中的廣播變量就可以。

package SparkStreaming; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.broadcast.Broadcast; import java.util.Arrays; import java.util.List; public class BroadCast { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local") .setAppName("BroadCast"); JavaSparkContext sc = new JavaSparkContext(conf); /* * 使用廣播變量,廣播變量的定義必須在driver端,因為sc沒有被序列化不能被發送到Executor端 * */ Broadcast<String> blackname = sc.broadcast("dwj3"); List<String> name = Arrays.asList( "dwj1", "dwj2", "dwj3"); //String blackName = "dwj3";
        JavaRDD<String> nameRDD = sc.parallelize(name); JavaRDD<String> namefilter = nameRDD.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { String blacknames = blackname.getValue(); return !blacknames.equals(s); } }); List<String> lastname = namefilter.collect(); for(String str:lastname){ System.out.println(str); } } }

注意:在聲明廣播變量的時候,必須在driver端,因為sc沒有被序列化,是不能被發送到Executor端的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM