spark sql 源碼學習Dataset(三)structField、structType、schame


1、structField

源碼結構:

    case class StructField(
        name: String,
        dataType: DataType,
        nullable: Boolean = true,
        metadata: Metadata = Metadata.empty) {}

-----A field inside a StructType
name:The name of this field.
dataType:The data type of this field.
nullable:Indicates if values of this field can be null values.  //指示這個字段的指是否可以為空值
metadata:The metadata of this field. The metadata should be preserved during transformation if the content of the column is not modified, e.g, in selection.
//此字段的元數據。如果不修改列的內容,則在轉換期間應保存元數據,例如。g,在選擇。
一個結構體內部的 一個StructField就像一個SQL中的一個字段一樣,它包含了這個字段的具體信息,可以看如下列子:

def schema_StructField()={
/**
  * StructField 是 一個 case class ,其中是否可以為空,默認是 true,初始元信息是為空
  * 它是作為描述 StructType中的一個字段
  */
    val sf = new StructField("b",IntegerType)
    println(sf.name)//b
    println(sf.dataType)//IntegerType
    println(sf.nullable)//true
    println(sf.metadata)//{}
}

2、structType
A StructType object can be constructed by

StructType(fields: Seq[StructField])
一個StructType對象,可以有多個StructField,同時也可以用名字(name)來提取,就想當於Map可以用key來提取value,但是他StructType提取的是整條字段的信息
在源碼中structType是一個case class,如下:
case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {}

它是繼承Seq的,也就是說Seq的操作,它都擁有,但是從形式上來說,每個元素是用  StructField包住的。

package Dataset
 
import org.apache.spark.sql.types._
 
 
/**
  * Created by root on 9/21/16.
  */
object schemaAnalysis {
  //--------------------------------------------------StructType analysis---------------------------------------
  val struct = StructType(
    StructField("a", IntegerType) ::
      StructField("b", LongType, false) ::
      StructField("c", BooleanType, false) :: Nil)
 
  def schema_StructType()={
    /**
      * 一個scheme是
      */
    import org.apache.spark.sql.types.StructType
    val schemaTyped = new StructType()
      .add("a","int").add("b","string")
    schemaTyped.foreach(println)
    /**
      * StructField(a,IntegerType,true)
      * StructField(b,StringType,true)
      */
  }
  def structType_extracted()={
 
    // Extract a single StructField.
    val singleField_a = struct("a")
    println(singleField_a)
    //省卻的清空下表示:可以為空的,
    //StructField(a,IntegerType,true)
    val singleField_b = struct("b")
    println(singleField_b)
    //StructField(b,LongType,false)
 
    //val nonExisting = struct("d")
    //println(nonExisting)
    //java.lang.IllegalArgumentException: Field "d" does not exist.
 
    // Extract multiple StructFields. Field names are provided in a set.
    // A StructType object will be returned.
    val twoFields = struct(Set("b", "c"))
    println(twoFields)
 
 
    //StructType(StructField(b,LongType,false), StructField(c,BooleanType,false))
    // Any names without matching fields will be ignored.
    // For the case shown below, "d" will be ignored and
    // it is treated as struct(Set("b", "c")).
    val ignoreNonExisting = struct(Set("b", "c", "d"))
    println(ignoreNonExisting)
    // ignoreNonExisting: StructType =
    //   StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
 
    //值得注意的是:當沒有存在的字段的時候,官方文檔說:單個返回的是null,多個返回的是當沒有那個字段
    //但是實驗的時候,報錯---Field d does not exist
    //源碼調用的是apply方法,確實還沒有處理好這部分功能
    //我是用的是spark2.0初始版本
 
  }
  def structType_opration()={
 
    /**
      * 源碼:case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {
      * 它是繼承與Seq的,也就是說 Seq的操作,StructType都有
      * 可以查看scala的Seq的操作:http://www.scala-lang.org/api/current/#scala.collection.Seq
      */
    val tmpStruct = StructType(StructField("d", IntegerType)::Nil)
    //集合與集合的操作
    println(struct++tmpStruct)
    // println(struct++:tmpStruct)
    //List(StructField(a,IntegerType,true), StructField(b,LongType,false), StructField(c,BooleanType,false), StructField(d,IntegerType,true))
 
    //集合與元素的操作
    println(struct :+ StructField("d", IntegerType))
 
    //可以用add來進行
 
    println(struct.add("e",IntegerType))
    //StructType(StructField(a,IntegerType,true), StructField(b,LongType,false), StructField(c,BooleanType,false), StructField(e,IntegerType,true))
 
    //head 部分的元素
    println(struct.head)
    //StructField(a,IntegerType,true)
 
 
    //last 部分的元素
    println(struct.last)
    //StructField(c,BooleanType,false)
 
    println(struct.apply("a"))
    //StructField(a,IntegerType,true)
 
    println(struct.treeString)
 
    /**
      * root
       |-- a: integer (nullable = true)
       |-- b: long (nullable = false)
       |-- c: boolean (nullable = false)
      */
 
    println(struct.contains(StructField("f", IntegerType)))
    //false
 
    println(struct.mkString)
    //StructField(a,IntegerType,true)StructField(b,LongType,false)StructField(c,BooleanType,false)
 
    println(struct.prettyJson)
 
    /**
      * {
          "type" : "struct",
          "fields" : [ {
            "name" : "a",
            "type" : "integer",
            "nullable" : true,
            "metadata" : { }
          }, {
            "name" : "b",
            "type" : "long",
            "nullable" : false,
            "metadata" : { }
          }, {
            "name" : "c",
            "type" : "boolean",
            "nullable" : false,
            "metadata" : { }
          } ]
        }
      */
    //更多操作可以查看API:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructType
  }
 
 
 
  def main(args: Array[String]) {
    //schema_StructType()
    //structType_extracted()
    structType_opration()
  }
}

3、Schema
---------Schema就是我們數據的數據結構描述,
一個Schema是一個數據結構的描述(比如描述一個Json文件),它可以是在運行的時候隱式導入,或者在編譯的時候就導入。它是用一個StructField集合對象的StructType描述(用一個三元tuple,內部是:name,type.nullability),本來有四個信息的為什么會說是三元數組?其實metadata,你是可以調出來。

def schema_op()={
  case class Person(name: String, age: Long)
  val sparkSession = SparkSession.builder().appName("data set example")
    .master("local").getOrCreate()
  import sparkSession.implicits._
  val rdd = sparkSession.sparkContext.textFile("hdfs://master:9000/src/main/resources/people.txt")
  val dataSet = rdd.map(_.split(",")).map(p =>Person(p(0),p(1).trim.toLong)).toDS()
  println(dataSet.schema)
  //StructType(StructField(name,StringType,true), StructField(age,LongType,false))
 
 
  /**
    * def schema: StructType = queryExecution.analyzed.schema
    *
    * def apply(name: String): StructField = {
    * nameToField.getOrElse(name,
    * throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
    * }
    */
  val tmp: StructField = dataSet.schema("name")
  println(tmp)
  //StructField(name,StringType,true)
 
 
  println(tmp.name)//name
  println(tmp.dataType)//StringType
  println(tmp.nullable)//true
  println(tmp.metadata)//{}
--------------------- 

 



 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM