Spark广播变量
使用广播变量来优化,广播变量的原理是:
在每一个Executor中保存一份全局变量,task在执行的时候需要使用和这一份变量就可以,极大的减少了Executor的内存开销。
Executor中task在执行的时候如果使用到了广播变量,会找Executor里面的BlockManager来获取广播变量。
如果BlockManager中没有这个关闭变量,会从driver端拉取关闭变量。
在Driver端也有一个blockManagerMaster,其他的task执行的时候直接使用blockmanager中的广播变量就可以。
package SparkStreaming; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.broadcast.Broadcast; import java.util.Arrays; import java.util.List; public class BroadCast { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local") .setAppName("BroadCast"); JavaSparkContext sc = new JavaSparkContext(conf); /* * 使用广播变量,广播变量的定义必须在driver端,因为sc没有被序列化不能被发送到Executor端 * */ Broadcast<String> blackname = sc.broadcast("dwj3"); List<String> name = Arrays.asList( "dwj1", "dwj2", "dwj3"); //String blackName = "dwj3";
JavaRDD<String> nameRDD = sc.parallelize(name); JavaRDD<String> namefilter = nameRDD.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { String blacknames = blackname.getValue(); return !blacknames.equals(s); } }); List<String> lastname = namefilter.collect(); for(String str:lastname){ System.out.println(str); } } }
注意:在声明广播变量的时候,必须在driver端,因为sc没有被序列化,是不能被发送到Executor端的。