Spark廣播變量
使用廣播變量來優化,廣播變量的原理是:
在每一個Executor中保存一份全局變量,task在執行的時候需要使用和這一份變量就可以,極大的減少了Executor的內存開銷。
Executor中task在執行的時候如果使用到了廣播變量,會找Executor里面的BlockManager來獲取廣播變量。
如果BlockManager中沒有這個關閉變量,會從driver端拉取關閉變量。
在Driver端也有一個blockManagerMaster,其他的task執行的時候直接使用blockmanager中的廣播變量就可以。
package SparkStreaming; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.broadcast.Broadcast; import java.util.Arrays; import java.util.List; public class BroadCast { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local") .setAppName("BroadCast"); JavaSparkContext sc = new JavaSparkContext(conf); /* * 使用廣播變量,廣播變量的定義必須在driver端,因為sc沒有被序列化不能被發送到Executor端 * */ Broadcast<String> blackname = sc.broadcast("dwj3"); List<String> name = Arrays.asList( "dwj1", "dwj2", "dwj3"); //String blackName = "dwj3";
JavaRDD<String> nameRDD = sc.parallelize(name); JavaRDD<String> namefilter = nameRDD.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { String blacknames = blackname.getValue(); return !blacknames.equals(s); } }); List<String> lastname = namefilter.collect(); for(String str:lastname){ System.out.println(str); } } }
注意:在聲明廣播變量的時候,必須在driver端,因為sc沒有被序列化,是不能被發送到Executor端的。