徹底解決,sparkSQL讀取csv中Map字段類型的問題


 

 

先說歷史情況: 

在spark2.0版本之前(比如1.6版本),spark sql如果讀取csv格式數據,要導入:

<dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.11</artifactId>
</dependency>

代碼:

spark
     .format("com.databricks.spark.csv")
      .option("header", "true")
      .load("csv數據路徑")

 

在spark2.0以后,spark把databricks的代碼內置到了自己的源碼系統中,在通過一套非常簡單的模板API就能讀取到csv數據,比如:

spark.read.csv("csv數據路徑")

 

以上操作,都去普通簡單的數據類型均沒有問題,比如讀取這些類型:

ByteType、ShortType、IntegerType、LongType、FloatType、DoubleType、BooleanType、DecimalType、TimestampType、DateType、StringType

但是一旦讀取復雜一些的,比如你讀取csv文件,將數據寫入hive表中【但是hive表的schema中有個字段的類型是Map<String,String>】,那么以上的方式就出現不兼容了;

我的csv數據格式:

 

 

 

 

我的生產問題:

生產spark版本切換,要從1.6版本直接升級到目前2.x最穩定的版本(2.4.6)

首先是歷史原因,會有一些支線業務,通過rsink把csv文件分發到機器A上,然后spark會讀取csv文件,將csv文件內容以orc格式寫入hive某張表

但是之前說過的問題,sparksql讀取csv文件 ,不知道Map類型,所以生產報錯;

 

解決:

思路就是:通過自定義數據源的方式來支持這個Map格式,自定義數據源的思路看我之前寫的這篇文章:

關於自定義sparkSQL數據源(Hbase)操作中遇到的坑
https://www.cnblogs.com/niutao/p/10801259.html

如何讓spark sql寫mysql的時候支持update操作
https://www.cnblogs.com/niutao/p/11809695.html

 

第一步:

去git上把com.databricks:spark-csv的源碼拉下來

git clone https://github.com/databricks/spark-csv.git

第二步:

這里面實現的思路也是通過自定義數據源的方式來支持spark讀取csv的;

所以,復制com.databricks:spark-csv源碼到你的工程下(隨便新建一個spark工程),比如:

 

 上面就是復制過來的源碼,然后找到TypeCast這個object;

spark讀取csv,適配csv里面的類型,就是在這個TypeCast.castTo代碼中進行適配的:

/**
   * Casts given string datum to specified type.
   * Currently we do not support complex types (ArrayType, MapType, StructType).
   *
   * For string types, this is simply the datum. For other types.
   * For other nullable types, this is null if the string datum is empty.
   *
   * @param datum string value
   * @param castType SparkSQL type
   */
  private[csv] def castTo(
      datum: String,
      castType: DataType,
      nullable: Boolean = true,
      treatEmptyValuesAsNulls: Boolean = false,
      nullValue: String = "",
      dateFormatter: SimpleDateFormat = null): Any = {

    if (datum == nullValue &&
      nullable ||
      (treatEmptyValuesAsNulls && datum == "")){
      null
    } else {
      castType match {
        case _: ByteType => datum.toByte
        case _: ShortType => datum.toShort
        case _: IntegerType => datum.toInt
        case _: LongType => datum.toLong
        case _: FloatType => Try(datum.toFloat)
          .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue())
        case _: DoubleType => Try(datum.toDouble)
          .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue())
        case _: BooleanType => datum.toBoolean
        case _: DecimalType => new BigDecimal(datum.replaceAll(",", ""))
        case _: TimestampType if dateFormatter != null =>
          new Timestamp(dateFormatter.parse(datum).getTime)
        case _: TimestampType => Timestamp.valueOf(datum)
        case _: DateType if dateFormatter != null =>
          new Date(dateFormatter.parse(datum).getTime)
        case _: DateType => Date.valueOf(datum)
        case _: StringType => datum
        //適配Map類型
        case _: MapType => {
          var map = Map[String,String]()
          val arr = datum.split(":")
          map += (arr(0) -> arr(1))
          map
        }
        case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
      }
    }
  }

如上,添加一個適配Map類型即可

 

第三步:

對這個工程打包,最終生成一個jar包,必須叫做example.jar

然后導入這個新包,就完美解決讀取csv中Map類型了

 

代碼放在git了:https://github.com/niutaofan/pareCSV.git

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM