
先說歷史情況:
在spark2.0版本之前(比如1.6版本),spark sql如果讀取csv格式數據,要導入:
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.11</artifactId>
</dependency>
代碼:
spark .format("com.databricks.spark.csv") .option("header", "true") .load("csv數據路徑")
在spark2.0以后,spark把databricks的代碼內置到了自己的源碼系統中,在通過一套非常簡單的模板API就能讀取到csv數據,比如:
spark.read.csv("csv數據路徑")
以上操作,都去普通簡單的數據類型均沒有問題,比如讀取這些類型:
ByteType、ShortType、IntegerType、LongType、FloatType、DoubleType、BooleanType、DecimalType、TimestampType、DateType、StringType
但是一旦讀取復雜一些的,比如你讀取csv文件,將數據寫入hive表中【但是hive表的schema中有個字段的類型是Map<String,String>】,那么以上的方式就出現不兼容了;
我的csv數據格式:

我的生產問題:
生產spark版本切換,要從1.6版本直接升級到目前2.x最穩定的版本(2.4.6)
首先是歷史原因,會有一些支線業務,通過rsink把csv文件分發到機器A上,然后spark會讀取csv文件,將csv文件內容以orc格式寫入hive某張表
但是之前說過的問題,sparksql讀取csv文件 ,不知道Map類型,所以生產報錯;
解決:
思路就是:通過自定義數據源的方式來支持這個Map格式,自定義數據源的思路看我之前寫的這篇文章:
關於自定義sparkSQL數據源(Hbase)操作中遇到的坑
https://www.cnblogs.com/niutao/p/10801259.html
如何讓spark sql寫mysql的時候支持update操作
https://www.cnblogs.com/niutao/p/11809695.html
第一步:
去git上把com.databricks:spark-csv的源碼拉下來
git clone https://github.com/databricks/spark-csv.git
第二步:
這里面實現的思路也是通過自定義數據源的方式來支持spark讀取csv的;
所以,復制com.databricks:spark-csv源碼到你的工程下(隨便新建一個spark工程),比如:

上面就是復制過來的源碼,然后找到TypeCast這個object;
spark讀取csv,適配csv里面的類型,就是在這個TypeCast.castTo代碼中進行適配的:
/** * Casts given string datum to specified type. * Currently we do not support complex types (ArrayType, MapType, StructType). * * For string types, this is simply the datum. For other types. * For other nullable types, this is null if the string datum is empty. * * @param datum string value * @param castType SparkSQL type */ private[csv] def castTo( datum: String, castType: DataType, nullable: Boolean = true, treatEmptyValuesAsNulls: Boolean = false, nullValue: String = "", dateFormatter: SimpleDateFormat = null): Any = { if (datum == nullValue && nullable || (treatEmptyValuesAsNulls && datum == "")){ null } else { castType match { case _: ByteType => datum.toByte case _: ShortType => datum.toShort case _: IntegerType => datum.toInt case _: LongType => datum.toLong case _: FloatType => Try(datum.toFloat) .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue()) case _: DoubleType => Try(datum.toDouble) .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue()) case _: BooleanType => datum.toBoolean case _: DecimalType => new BigDecimal(datum.replaceAll(",", "")) case _: TimestampType if dateFormatter != null => new Timestamp(dateFormatter.parse(datum).getTime) case _: TimestampType => Timestamp.valueOf(datum) case _: DateType if dateFormatter != null => new Date(dateFormatter.parse(datum).getTime) case _: DateType => Date.valueOf(datum) case _: StringType => datum //適配Map類型 case _: MapType => { var map = Map[String,String]() val arr = datum.split(":") map += (arr(0) -> arr(1)) map } case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}") } } }
如上,添加一個適配Map類型即可
第三步:
對這個工程打包,最終生成一個jar包,必須叫做example.jar
然后導入這個新包,就完美解決讀取csv中Map類型了
代碼放在git了:https://github.com/niutaofan/pareCSV.git
