應用場景:使用JavaHiveContext執行SQL之后,希望能得到其字段名及相應的值,但卻出現"Caused by: java.io.NotSerializableException: org.apache.spark.sql.api.java.StructField"的錯誤,代碼如下:
JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlContext = new JavaHiveContext(sc); JavaSchemaRDD schema = sqlContext.sql("select * from default.dual"); final StructField[] fields = schema.schema().getFields(); JavaRDD<String> result = schema.map(new Function<Row, String>() { private static final long serialVersionUID = 1L; @Override public String call(Row row) throws Exception { StringBuffer out = new StringBuffer(); for (int i = 0; i < row.length(); i++) { out.append(fields[i].getName() + "->" + row.get(i) + ";"); } return out.toString(); } }); System.out.println(result.collect());
在spark官網上查找序列化方面的內容,看到可以通過注冊的方式自定義類的序列化方式,故在conf上添加以下設置:
conf.registerKryoClasses(new Class[] { org.apache.spark.sql.api.java.StructField.class });
測試執行后,還是報相同的錯誤:java.io.NotSerializableException: org.apache.spark.sql.api.java.StructField,不知道為什么,如果有朋友知道,可在下面留言。
上述方法測不通后,又再網上尋求方法,此時看到了下面的這篇文章,內容摘錄見下:http://www.cnblogs.com/zwCHAN/p/4305156.html
按照第一種方法,將依賴的變量StructField[]放到map內部定義,代碼見下:
JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlContext = new JavaHiveContext(sc); JavaSchemaRDD schema = sqlContext.sql("select * from default.dual"); JavaRDD<String> result = schema.map(new Function<Row, String>() { private static final long serialVersionUID = 1L; @Override public String call(Row row) throws Exception { StructField[] fields = schema.schema().getFields(); StringBuffer out = new StringBuffer(); for (int i = 0; i < row.length(); i++) { out.append(fields[i].getName() + "->" + row.get(i) + ";"); } return out.toString(); } }); System.out.println(result.collect());
測試通過,但考慮到每次map都需要從JavaSchemaRDD中獲取一次schema信息,比較耗時,而在map中有只需要String類型的字段名就可以了,故在原有基礎上對代碼進行優化,見下:
JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlContext = new JavaHiveContext(sc); JavaSchemaRDD schema = sqlContext.sql("select * from default.dual"); StructField[] fields = schema.schema().getFields(); final String[] fieldsName = new String[fields.length]; for (int i = 0; i < fields.length; i++) { fieldsName[i] = fields[i].getName(); } JavaRDD<String> result = schema.map(new Function<Row, String>() { private static final long serialVersionUID = 1L; @Override public String call(Row row) throws Exception { StringBuffer out = new StringBuffer(); for (int i = 0; i < row.length(); i++) { out.append(fieldsName[i] + "->" + row.get(i) + ";"); } return out.toString(); } }); System.out.println(result.collect());
以下內容摘錄自:http://www.cnblogs.com/zwCHAN/p/4305156.html
出現“org.apache.spark.SparkException: Task not serializable"這個錯誤,一般是因為在map、filter等的參數使用了外部的變量,但是這個變量不能序列化。特別是當引用了某個類(經常是當前類)的成員函數或變量時,會導致這個類的所有成員(整個類)都需要支持序列化。解決這個問題最常用的方法有:
- 如果可以,將依賴的變量放到map、filter等的參數內部定義。這樣就可以使用不支持序列化的類;
- 如果可以,將依賴的變量獨立放到一個小的class中,讓這個class支持序列化;這樣做可以減少網絡傳輸量,提高效率;
- 如果可以,將被依賴的類中不能序列化的部分使用transient關鍵字修飾,告訴編譯器它不需要序列化。
- 將引用的類做成可序列化的。
- 以下這兩個沒試過。。
- Make the NotSerializable object as a static and create it once per machine.
- Call rdd.forEachPartition and create the NotSerializable object in there like this:
If you see this error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...
The above error can be triggered when you intialize a variable on the driver (master), but then try to use it on one of the workers. In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code snippet:
NotSerializable notSerializable = new NotSerializable(); JavaRDD<String> rdd = sc.textFile("/tmp/myfile"); rdd.map(s -> notSerializable.doSomething(s)).collect();
This will trigger that error. Here are some ideas to fix this error:
- Serializable the class
- Declare the instance only within the lambda function passed in map.
- Make the NotSerializable object as a static and create it once per machine.
- Call rdd.forEachPartition and create the NotSerializable object in there like this:
rdd.forEachPartition(iter -> {
NotSerializable notSerializable = new NotSerializable(); // ...Now process iter });
另外, stackoverflow上http://stackoverflow.com/questions/25914057/task-not-serializable-exception-while-running-apache-spark-job 這個答的也很簡明易懂。