DataFrame與數據庫的相互轉化


 

 

在Spark中,Dataframe簡直可以稱為內存中的文本文件。

就像在電腦上直接操作txt、 csv、 json文件一樣簡單。

 

val sparkConf = new SparkConf().setAppName("df2db").setMaster("local[1]")

val sc = new SparkContext(sparkConf)

val sqlContext : SQLContext = new SQLContext(sc)

val df = sqlContext.read.format("csv").option("header","true").load("D:\\spark test\\123")

val snapTable = "env0y"

df.registerTempTable(snapTable)

 

 

以上寥寥數語就把一個csv文件轉為DataFrame並注冊為一張臨時表了,這時候就可以像操作數據庫表一樣操作這個snapTable了:

 

val sql = "SELECT * FROM " + snapTable

val dfTmp = this.sqlContext.sql(sql)

 

 

這樣寫代碼方便簡單,但可惜的是DataFrame畢竟僅僅存在於內存中,我們業務代碼只會輸出算法里規定的結果

 

也就是說,假如結果出錯,不好定位到底是DataFrame本身數據有誤,還是代碼中的SQL寫錯了。。。

 

假如能隨時隨地操作DataFrame就好了,怎么辦呢?

把DataFrame保存到真實的數據庫去:

 

import java.util.Properties

val connectionUrl = "jdbc:sqlserver://10.119.46.153:1433"

val table = "Nettransmit.dbo.df2mssql"

val prop = new Properties()

prop.put("JDBC.Driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")

prop.put("user", "sa")

prop.put("password", "ibas.1597")

val dfWriter = df.write.mode(SaveMode.Overwrite).jdbc(connectionUrl, table, prop)

 

這下好了,如果計算出錯了,我們直接連上數據庫幾條sql就能debug個八九不離十。

唯一要注意的是,DataFrame to Database不是業務要求,所以上面的代碼只能在開發模式或者測試模式的時候存在,正式發布版不應該出現 

 

既然可以寫進去,自然也可以讀出來:

//SqlServer 2 Dataframe

val dfviatable = sqlContext.read.jdbc(connectionUrl,table,prop)

dfviatable.show(10)

 

以上,DataFrame和數據庫之間的極簡交互就完成了,但如果業務中真的有讀寫數據庫的需求了,性能問題可能會成為瓶頸,要注意的。

 

接下來是那么一點點優化。

從csv到DataFrame,我們使用df.printSchema()語句可以在控制台看到類似下面的輸出:

root

 |-- IMSI: string (nullable = true)

|-- UserType: string (nullable = true)

 |-- Total PS Traffic(KB): string (nullable = true)

 |-- Total Online Time(s): string (nullable = true)

 |-- Total CS Traffic (ERL): string (nullable = true)

 |-- Brand: string (nullable = true)

 |-- Series: string (nullable = true)

 |-- OS: string (nullable = true)

 |-- Type: string (nullable = true)

 |-- FDD LTE: string (nullable = true)

 |-- TDD LTE: string (nullable = true)

|-- Only Report 3G Capability: string (nullable = true)

 

也就是說,寫入到數據庫之后每個字段的類型都是string,這顯然是一種浪費。

 

而且很多值完全可以使用int或者double或者bool類型。

 

怎么辦呢?得修改數據庫的“方言”,就像在c++中std::locale 建立本地規則一樣。

 

為了方便起見,封裝一下:

import java.io.{File, FileInputStream}

import java.util.Properties

 

import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}

import org.apache.spark.sql.types._

import org.apache.spark.sql.{DataFrame, SaveMode}

 

/**

  * Created by env0y on 2017/11/24.

  */

object dataframe2db {

  def df2db(df: DataFrame,table: String,properties: String) = {

    try{

      val is = new FileInputStream(new File(properties))

      val prop = new Properties()

      prop.load(is)

      val url = String.valueOf(prop.get("url"))//

      JdbcDialects.registerDialect(SQLServerDialect)

      df.write.mode(SaveMode.Overwrite).jdbc(url,table,prop)

      is.close()

    }

  }

 

  val SQLServerDialect = new JdbcDialect {

    override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")

 

    override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {

      case StringType => Some(JdbcType("NVARCHAR(128)", java.sql.Types.VARCHAR))

      case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))

      case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))

      case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))

      case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))

      case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))

      case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))

      case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))

      case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))

      case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))

      case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))

      // case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))

      case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))

      case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")

    }

  }

 

}

 

然后像這樣調用:

dataframe2db.df2db(df,"Nettransmit.dbo.df2dbff","D:\\ database.properties")

 

第三個參數是數據庫的屬性配置文件,內容類似以下:

#\u5F00\u53D1\u6570\u636E\u5E93

driver=com.microsoft.sqlserver.jdbc.SQLServerDriver

url=jdbc:sqlserver://10.119.46.153:1433;databaseName=TspManagement

username=sa

password=ibas.1597

 

這時候再去觀察從DataFrame寫入到數據庫中表會發現,字段屬性都變成NVARCHAR(128)了~~

 

 

 

另外,直接修改DataFrame里面的Schema類型也很簡單:

val df1 = df.withColumn("Only Report 3G Capability",col("Only Report 3G Capability").cast(DataTypes.FloatType))

df1.printSchema()

 

 

就這些,以上Spark的版本是1.6. 涉及的數據庫是sqlServer.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM