轉載引用自:http://www.cnblogs.com/tovin/p/3833985.html
最近在使用spark開發過程中發現當數據量很大時,如果cache數據將消耗很多的內存。為了減少內存的消耗,測試了一下 Kryo serialization的使用
代碼包含三個類,KryoTest、MyRegistrator、Qualify。
我們知道在Spark默認使用的是Java自帶的序列化機制。如果想使用Kryo serialization,只需要添加KryoTest類中的紅色部分,指定spark序列化類
另外還需要增加MyRegistrator類,注冊需要用Kryo序列化的類
public class KryoTest { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("KryoTest"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.kryo.registrator", "MyRegistrator"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> rdd = sc.textFile("/home/hdpusr/qualifying.txt"); JavaRDD<Qualify> map = rdd.map(new Function<String, Qualify>() { /* (non-Javadoc) * @see org.apache.spark.api.java.function.Function#call(java.lang.Object) */ public Qualify call(String v1) throws Exception { // TODO Auto-generated method stub String s[] = v1.split(","); Qualify q = new Qualify(); q.setA(Integer.parseInt(s[0])); q.setB(Long.parseLong(s[1])); q.setC(s[2]); return q; } }); map.persist(StorageLevel.MEMORY_AND_DISK_SER()); System.out.println(map.count()); } }
import org.apache.spark.serializer.KryoRegistrator; import com.esotericsoftware.kryo.Kryo; public class MyRegistrator implements KryoRegistrator{ /* (non-Javadoc) * @see org.apache.spark.serializer.KryoRegistrator#registerClasses(com.esotericsoftware.kryo.Kryo) */ public void registerClasses(Kryo arg0) { // TODO Auto-generated method stub arg0.register(Qualify.class); } }
import java.io.Serializable; public class Qualify implements Serializable{ int a; long b; String c; public int getA() { return a; } public void setA(int a) { this.a = a; } public long getB() { return b; } public void setB(long b) { this.b = b; } public String getC() { return c; } public void setC(String c) { this.c = c; } }
下面我們看看使用Java serialization 與Kryo serialization的效果對比
Java serialization
Kryo serialization
從實際跑的數據可以看出還是能節省不少內存的。當內存不夠用的時候建議使用Kryo serialization這種方式