轉載:http://blog.csdn.net/sparkexpert/article/details/52871000
隨着新版本的spark已經逐漸穩定,最近擬將原有框架升級到spark 2.0。還是比較興奮的,特別是SQL的速度真的快了許多。。
然而,在其中一個操作時卻卡住了。主要是dataframe.map操作,這個之前在spark 1.X是可以運行的,然而在spark 2.0上卻無法通過。。
看了提醒的問題,主要是:
error:
Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. resDf_upd.map(row =>
{
針對這個問題,網上所得獲取的資料還真不多。不過想着肯定是dataset統一了datframe與rdd之后就出現了新的要求。
經過查看spark官方文檔,對spark有了一條這樣的描述。
Dataset is Spark SQL’s strongly-typed API for working with structured data, i.e. records with a known schema.
Datasets are lazy and structured query expressions are only triggered when an action is invoked. Internally, aDataset represents a logical plan that describes the computation query required to produce the data (for a givenSpark SQL session).
A Dataset is a result of executing a query expression against data storage like files, Hive tables or JDBC databases. The structured query expression can be described by a SQL query, a Column-based SQL expression or a Scala/Java lambda function. And that is why Dataset operations are available in three variants.
從這可以看出,要想對dataset進行操作,需要進行相應的encode操作。特別是官網給的例子
// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
從這看出,要進行map操作,要先定義一個Encoder。。
這就增加了系統升級繁重的工作量了。為了更簡單一些,幸運的dataset也提供了轉化RDD的操作。因此只需要將之前dataframe.map
在中間修改為:dataframe.rdd.map即可。